Compare commits

...

32 Commits

Author SHA1 Message Date
Yicheng Qin 5dcbb998f1 *: bump to v2.1.3+git 2015-09-03 13:33:49 -07:00
Yicheng Qin 30801de468 *: bump to v2.1.3 2015-09-03 13:33:27 -07:00
Yicheng Qin dbac8c8f42 etcdmain: check error before assigning peer transport
Or it may panic when new transport fails, e.g., TLS info is invalid.
2015-09-03 13:22:19 -07:00
Yicheng Qin 151c18d650 *: bump to v2.1.2+git 2015-08-21 16:20:16 -07:00
Yicheng Qin ff8d1ecb9f *: bump to v2.1.2 2015-08-21 16:19:55 -07:00
Yicheng Qin ccb67a691b pkg/netutil: stop resolving in place
It helps to copy out a and b, and not modify the original a and b.
2015-08-21 15:39:57 -07:00
Yicheng Qin 059233768e pkg/netutil: not introduce empty url when converting
It should not make slices with length and append elements at the same
time.
2015-08-21 15:39:48 -07:00
Yicheng Qin c530acf6a4 pkg/netutil: not export resolve and urlsEqual functions
They are only used in this package, so there is no need to public them.
2015-08-21 15:39:38 -07:00
Yicheng Qin bad1b20620 pkg/netutil: fix false negative comparison
Sort the resolved URLs before DeepEqual, so it will not compare URLs
that may be out of order due to resolution.
2015-08-21 15:39:29 -07:00
Yicheng Qin 89640cf08f etcdserver: remove TODO to delete URLStringsEqual
Discovery SRV supports to compare IP addresses with domain names,
so we need URLStringsEqual function.
2015-08-21 15:39:19 -07:00
Yicheng Qin bbefb0ad0b Revert "Revert "Treat URLs have same IP address as same""
This reverts commit 3153e635d5.

Conflicts:
	etcdserver/config.go
2015-08-21 15:39:10 -07:00
Xiang Li 8e0706583c etcdmain: print out version information on startup
Conflicts:
	etcdmain/etcd.go
2015-08-21 15:38:34 -07:00
Yicheng Qin cd2a2182cf etcdctl/cluster_health: set health var when checked healthy
This was a typo.
2015-08-21 15:37:26 -07:00
Xiang Li 8d410bdfcb etcdctl: use health endpoint to greatly simplify health checking 2015-08-21 15:22:19 -07:00
Xiang Li 0a2d2b8b9d etcdctl: cluster-health supports forever flag
cluster-health command supports checking the cluster health
forever.
2015-08-21 15:22:13 -07:00
Yicheng Qin 6c9e876d7a etcdctl: refactor the way to check cluster health
This method uses raft status exposed at /debug/varz to determine the
health of the cluster. It uses whether commit index increases to
determine the cluster health, and uses whether match index increases to
determine the member health.

This could fix the bug #2711 that fails to detect follower is unhealthy
because it doesn't rely on whether message in long-polling connection is sent.

This health check is stricter than the old one, and reflects the
situation that whether followers are healthy in the view of the leader. One
example is that if the follower is receiving the snapshot, it will turns
out to be unhealthy because it doesn't move forward.

`etcdctl cluster-health` will reflect the healthy view in the raft level,
while connectivity checks reflects the healthy view in transport level.
2015-08-21 15:22:05 -07:00
Xiang Li a845f82d4f etcdctl: health use etcd/client
Conflicts:
	etcdctl/command/cluster_health.go
2015-08-21 15:21:44 -07:00
Xiang Li c1c23626cb raft: downgrade the logging around snapshot to debugf
Snapshot related logging is spamming when leader trying to
sync a failed peer.

Conflicts:
	raft/raft.go
2015-08-21 15:11:47 -07:00
Xiang Li ac67aa9f63 etcdhttp:write etcderror for all errors in keyhandler 2015-08-21 15:10:01 -07:00
Xiang Li 52c5203370 *: key handler should write auth error as etcd error 2015-08-21 15:09:53 -07:00
Yicheng Qin 27bfb3fcb2 etcdserver: improve error message when timeout due to leader fail 2015-08-21 15:09:46 -07:00
Yicheng Qin 084936a920 etcdserver: specify timeout caused by leader election
Before this PR, the timeout caused by leader election returns:

```
14:45:37 etcd2 | 2015-08-12 14:45:37.786349 E | etcdhttp: got unexpected
response error (etcdserver: request timed out)
```

After this PR:

```
15:52:54 etcd1 | 2015-08-12 15:52:54.389523 E | etcdhttp: etcdserver:
request timed out, possibly due to leader down
```

Conflicts:
	etcdserver/raft.go
2015-08-21 15:09:32 -07:00
Brandon Philips d2ecd9cecf test: race detector doesn't work on armv7l
Test fails without this fix on armv7l:

    go test: -race is only supported on linux/amd64, freebsd/amd64, darwin/amd64 and windows/amd64
2015-08-21 14:59:07 -07:00
Brandon Philips 07b82832f0 etcdserver: move atomics to make etcd work on arm64
Follow the simple rule in the atomic package:

"On both ARM and x86-32, it is the caller's responsibility to arrange
for 64-bit alignment of 64-bit words accessed atomically. The first word
in a global variable or in an allocated struct or slice can be relied
upon to be 64-bit aligned."

Tested on a system with /proc/cpuinfo reporting:

processor       : 0
model name      : ARMv7 Processor rev 1 (v7l)
Features        : swp half thumb fastmult vfp edsp thumbee neon vfpv3
tls vfpv4 idiva idivt vfpd32 lpae evtstrm
CPU implementer : 0x41
CPU architecture: 7
CPU variant     : 0x0
CPU part        : 0xc0d
CPU revision    : 1
2015-08-21 14:58:59 -07:00
Yicheng Qin 61f4e74652 etcdmain: reject unreasonably high values of -election-timeout
This helps users to detect setting problem early.
2015-08-21 14:58:49 -07:00
Yicheng Qin 331ecdf8c8 client: return correct error for 50x response
etcd always returns 500/503 response when it may have no leader.
So we should log the other 50x response in a normal way.

This helps to log correctly when discovery meets 504 error. Before this
PR, it logs like this:

```
18:31:58 etcd2 | 2015/08/4 18:31:58 discovery: error #0: client: etcd
member https://discovery.etcd.io has no leader
18:31:58 etcd2 | 2015/08/4 18:31:58 discovery: waiting for other nodes:
error connecting to https://discovery.etcd.io, retrying in 4s
```

After this PR:

```
22:20:25 etcd2 | 2015/08/4 22:20:25 discovery: error #0: client: etcd
member https://discovery.etcd.io returns server error [Gateway Timeout]
22:20:25 etcd2 | 2015/08/4 22:20:25 discovery: waiting for other nodes:
error connecting to https://discovery.etcd.io, retrying in 4s
```

Conflicts:
	client/client.go
2015-08-21 14:55:35 -07:00
Xiang Li 3a346eac25 discovery: print out detailed cluster error
Conflicts:
	discovery/discovery.go
2015-08-21 14:54:36 -07:00
Xiang Li 97605046c1 client: return cluster error if the etcd cluster is not avaliable
Add a new ClusterError type. It contians all encountered errors and
return ClusterNotAvailable as the error string.

Conflicts:
	client/client.go
	discovery/discovery.go
2015-08-21 14:51:41 -07:00
Guohua Ouyang 41ecf7f722 etcdmain: Don't print flags when flag parse error
At present it prints the whole usage and flags, which cause the exact
error message is hidden two screens above.

Fixes #3141

Signed-off-by: Guohua Ouyang <gouyang@redhat.com>
2015-08-21 14:32:49 -07:00
Xiang Li fcd564efb8 etcdmian: fix initialization confilct
Fix #3142

Ignore flags if etcd is already initialized.
2015-08-21 14:32:41 -07:00
Yicheng Qin 0876c5e1ef etcdmain: warn when listening on HTTP if TLS is set
If the user sets TLS info, this implies that he wants to listen on TLS.
If etcd finds that urls to listen is still HTTP schema, it prints out
warning to notify user about possible wrong setting.
2015-08-21 14:32:31 -07:00
Yicheng Qin ef80bb5cbf pkg/transport: fix HTTPS downgrade bug for keepalive listener
If TLS config is empty, etcd downgrades keepalive listener from HTTPS to
HTTP without warning. This results in HTTPS downgrade bug for client urls.
The commit returns error if it cannot listen on TLS.
2015-08-21 14:32:18 -07:00
30 changed files with 498 additions and 204 deletions

View File

@ -28,7 +28,7 @@ To start etcd automatically using custom settings at startup in Linux, using a [
+ default: "100"
##### -election-timeout
+ Time (in milliseconds) for an election to timeout.
+ Time (in milliseconds) for an election to timeout. See [Documentation/tuning.md](tuning.md#time-parameters) for details.
+ default: "1000"
##### -listen-peer-urls

View File

@ -25,6 +25,8 @@ The election timeout should be set based on the heartbeat interval and your netw
Election timeouts should be at least 10 times your ping time so it can account for variance in your network.
For example, if the ping time between your nodes is 10ms then you should have at least a 100ms election timeout.
The upper limit of election timeout is 50000ms, which should only be used when deploying global etcd cluster. First, 5s is the upper limit of average global round-trip time. A reasonable round-trip time for the continental united states is 130ms, and the time between US and japan is around 350-400ms. Because package gets delayed a lot, and network situation may be terrible, 5s is a safe value for it. Then, because election timeout should be an order of magnitude bigger than broadcast time, 50s becomes its maximum.
You should also set your election timeout to at least 5 to 10 times your heartbeat interval to account for variance in leader replication.
For a heartbeat interval of 50ms you should set your election timeout to at least 250ms - 500ms.

View File

@ -30,6 +30,7 @@ import (
var (
ErrNoEndpoints = errors.New("client: no endpoints available")
ErrTooManyRedirects = errors.New("client: too many redirects")
ErrClusterUnavailable = errors.New("client: etcd cluster is unavailable or misconfigured")
errTooManyRedirectChecks = errors.New("client: too many redirect checks")
)
@ -223,23 +224,32 @@ func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Respo
var resp *http.Response
var body []byte
var err error
cerr := &ClusterError{}
for _, ep := range eps {
hc := c.clientFactory(ep)
resp, body, err = hc.Do(ctx, action)
if err != nil {
cerr.Errors = append(cerr.Errors, err)
if err == context.DeadlineExceeded || err == context.Canceled {
return nil, nil, err
return nil, nil, cerr
}
continue
}
if resp.StatusCode/100 == 5 {
switch resp.StatusCode {
case http.StatusInternalServerError, http.StatusServiceUnavailable:
// TODO: make sure this is a no leader response
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", ep.String()))
default:
cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", ep.String(), http.StatusText(resp.StatusCode)))
}
continue
}
break
return resp, body, nil
}
return resp, body, err
return nil, nil, cerr
}
func (c *httpClusterClient) Endpoints() []string {

View File

@ -342,7 +342,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
},
),
},
wantErr: context.DeadlineExceeded,
wantErr: &ClusterError{Errors: []error{context.DeadlineExceeded}},
},
// context.Canceled short-circuits Do
@ -356,7 +356,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
},
),
},
wantErr: context.Canceled,
wantErr: &ClusterError{Errors: []error{context.Canceled}},
},
// return err if there are no endpoints
@ -379,7 +379,7 @@ func TestHTTPClusterClientDo(t *testing.T) {
},
),
},
wantErr: fakeErr,
wantErr: &ClusterError{Errors: []error{fakeErr, fakeErr}},
},
// 500-level errors cause Do to fallthrough to next endpoint

33
client/cluster_error.go Normal file
View File

@ -0,0 +1,33 @@
// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import "fmt"
type ClusterError struct {
Errors []error
}
func (ce *ClusterError) Error() string {
return ErrClusterUnavailable.Error()
}
func (ce *ClusterError) Detail() string {
s := ""
for i, e := range ce.Errors {
s += fmt.Sprintf("error #%d: %s\n", i, e)
}
return s
}

View File

@ -28,13 +28,14 @@ import (
)
const (
ErrorCodeKeyNotFound = 100
ErrorCodeTestFailed = 101
ErrorCodeNotFile = 102
ErrorCodeNotDir = 104
ErrorCodeNodeExist = 105
ErrorCodeRootROnly = 107
ErrorCodeDirNotEmpty = 108
ErrorCodeKeyNotFound = 100
ErrorCodeTestFailed = 101
ErrorCodeNotFile = 102
ErrorCodeNotDir = 104
ErrorCodeNodeExist = 105
ErrorCodeRootROnly = 107
ErrorCodeDirNotEmpty = 108
ErrorCodeUnauthorized = 110
ErrorCodePrevValueRequired = 201
ErrorCodeTTLNaN = 202

View File

@ -216,7 +216,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
if eerr, ok := err.(*client.Error); ok && eerr.Code == client.ErrorCodeKeyNotFound {
return nil, 0, 0, ErrSizeNotFound
}
if err == context.DeadlineExceeded {
if ce, ok := err.(*client.ClusterError); ok {
plog.Error(ce.Detail())
return d.checkClusterRetry()
}
return nil, 0, 0, err
@ -230,7 +231,8 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
resp, err = d.c.Get(ctx, d.cluster, nil)
cancel()
if err != nil {
if err == context.DeadlineExceeded {
if ce, ok := err.(*client.ClusterError); ok {
plog.Error(ce.Detail())
return d.checkClusterRetry()
}
return nil, 0, 0, err
@ -261,7 +263,7 @@ func (d *discovery) checkCluster() ([]*client.Node, int, uint64, error) {
func (d *discovery) logAndBackoffForRetry(step string) {
d.retries++
retryTime := time.Second * (0x1 << d.retries)
plog.Infof("%s: connection to %s timed out, retrying in %s", step, d.url, retryTime)
plog.Infof("%s: error connecting to %s, retrying in %s", step, d.url, retryTime)
d.clock.Sleep(retryTime)
}
@ -306,7 +308,8 @@ func (d *discovery) waitNodes(nodes []*client.Node, size int, index uint64) ([]*
plog.Noticef("found %d peer(s), waiting for %d more", len(all), size-len(all))
resp, err := w.Next(context.Background())
if err != nil {
if err == context.DeadlineExceeded {
if ce, ok := err.(*client.ClusterError); ok {
plog.Error(ce.Detail())
return d.waitNodesRetry()
}
return nil, err

View File

@ -488,7 +488,7 @@ type clientWithRetry struct {
func (c *clientWithRetry) Create(ctx context.Context, key string, value string) (*client.Response, error) {
if c.failCount < c.failTimes {
c.failCount++
return nil, context.DeadlineExceeded
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
}
return c.clientWithResp.Create(ctx, key, value)
}
@ -496,7 +496,7 @@ func (c *clientWithRetry) Create(ctx context.Context, key string, value string)
func (c *clientWithRetry) Get(ctx context.Context, key string, opts *client.GetOptions) (*client.Response, error) {
if c.failCount < c.failTimes {
c.failCount++
return nil, context.DeadlineExceeded
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
}
return c.clientWithResp.Get(ctx, key, opts)
}
@ -511,7 +511,7 @@ type watcherWithRetry struct {
func (w *watcherWithRetry) Next(context.Context) (*client.Response, error) {
if w.failCount < w.failTimes {
w.failCount++
return nil, context.DeadlineExceeded
return nil, &client.ClusterError{Errors: []error{context.DeadlineExceeded}}
}
if len(w.rs) == 0 {
return &client.Response{}, nil

View File

@ -35,6 +35,7 @@ var errors = map[int]string{
EcodeRootROnly: "Root is read only",
EcodeDirNotEmpty: "Directory not empty",
ecodeExistingPeerAddr: "Peer address has existed",
EcodeUnauthorized: "The request requires user authentication",
// Post form related errors
ecodeValueRequired: "Value is Required in POST form",
@ -68,6 +69,7 @@ var errorStatus = map[int]int{
EcodeKeyNotFound: http.StatusNotFound,
EcodeNotFile: http.StatusForbidden,
EcodeDirNotEmpty: http.StatusForbidden,
EcodeUnauthorized: http.StatusUnauthorized,
EcodeTestFailed: http.StatusPreconditionFailed,
EcodeNodeExist: http.StatusPreconditionFailed,
EcodeRaftInternal: http.StatusInternalServerError,
@ -85,6 +87,7 @@ const (
EcodeRootROnly = 107
EcodeDirNotEmpty = 108
ecodeExistingPeerAddr = 109
EcodeUnauthorized = 110
ecodeValueRequired = 200
EcodePrevValueRequired = 201

View File

@ -2,141 +2,98 @@ package command
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"sort"
"strings"
"os/signal"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
func NewClusterHealthCommand() cli.Command {
return cli.Command{
Name: "cluster-health",
Usage: "check the health of the etcd cluster",
Flags: []cli.Flag{},
Name: "cluster-health",
Usage: "check the health of the etcd cluster",
Flags: []cli.Flag{
cli.BoolFlag{Name: "forever", Usage: "forever check the health every 10 second until CTRL+C"},
},
Action: handleClusterHealth,
}
}
func handleClusterHealth(c *cli.Context) {
endpoints, err := getEndpoints(c)
if err != nil {
handleError(ExitServerError, err)
forever := c.Bool("forever")
if forever {
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, os.Interrupt)
go func() {
<-sigch
os.Exit(0)
}()
}
tr, err := getTransport(c)
if err != nil {
handleError(ExitServerError, err)
}
client := etcd.NewClient(endpoints)
client.SetTransport(tr)
if c.GlobalBool("debug") {
go dumpCURL(client)
}
if ok := client.SyncCluster(); !ok {
handleError(ExitBadConnection, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
}
// do we have a leader?
cl := client.GetCluster()
ep, ls0, err := getLeaderStats(tr, cl)
if err != nil {
fmt.Println("cluster may be unhealthy: failed to connect", cl)
os.Exit(1)
}
// is raft stable and making progress?
client = etcd.NewClient([]string{ep})
client.SetTransport(tr)
resp, err := client.Get("/", false, false)
if err != nil {
fmt.Println("cluster is unhealthy")
os.Exit(1)
}
rt0, ri0 := resp.RaftTerm, resp.RaftIndex
time.Sleep(time.Second)
resp, err = client.Get("/", false, false)
if err != nil {
fmt.Println("cluster is unhealthy")
os.Exit(1)
}
rt1, ri1 := resp.RaftTerm, resp.RaftIndex
if rt0 != rt1 {
fmt.Println("cluster is unhealthy")
os.Exit(1)
}
if ri1 == ri0 {
fmt.Println("cluster is unhealthy")
os.Exit(1)
}
// are all the members makeing progress?
_, ls1, err := getLeaderStats(tr, []string{ep})
if err != nil {
fmt.Println("cluster is unhealthy")
os.Exit(1)
}
fmt.Println("cluster is healthy")
// self is healthy
var prints []string
prints = append(prints, fmt.Sprintf("member %s is healthy\n", ls1.Leader))
for name, fs0 := range ls0.Followers {
fs1, ok := ls1.Followers[name]
if !ok {
fmt.Println("Cluster configuration changed during health checking. Please retry.")
os.Exit(1)
}
if fs1.Counts.Success <= fs0.Counts.Success {
prints = append(prints, fmt.Sprintf("member %s is unhealthy\n", name))
} else {
prints = append(prints, fmt.Sprintf("member %s is healthy\n", name))
}
}
sort.Strings(prints)
for _, p := range prints {
fmt.Print(p)
}
os.Exit(0)
}
func getLeaderStats(tr *http.Transport, endpoints []string) (string, *stats.LeaderStats, error) {
// go-etcd does not support cluster stats, use http client for now
// TODO: use new etcd client with new member/stats endpoint
httpclient := http.Client{
hc := http.Client{
Transport: tr,
}
for _, ep := range endpoints {
resp, err := httpclient.Get(ep + "/v2/stats/leader")
if err != nil {
continue
mi := mustNewMembersAPI(c)
ms, err := mi.List(context.TODO())
if err != nil {
fmt.Println("cluster may be unhealthy: failed to list members")
handleError(ExitServerError, err)
}
for {
health := false
for _, m := range ms {
checked := false
for _, url := range m.ClientURLs {
resp, err := hc.Get(url + "/health")
if err != nil {
fmt.Printf("failed to check the health of member %s on %s: %v\n", m.ID, url, err)
continue
}
result := struct{ Health string }{}
d := json.NewDecoder(resp.Body)
err = d.Decode(&result)
resp.Body.Close()
if err != nil {
fmt.Printf("failed to check the health of member %s on %s: %v\n", m.ID, url, err)
continue
}
checked = true
if result.Health == "true" {
health = true
fmt.Printf("member %s is healthy: got healthy result from %s\n", m.ID, url)
} else {
fmt.Printf("member %s is unhealthy: got unhealthy result from %s\n", m.ID, url)
}
break
}
if !checked {
fmt.Printf("member %s is unreachable: %v are all unreachable\n", m.ID, m.ClientURLs)
}
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
continue
if health {
fmt.Println("cluster is healthy")
} else {
fmt.Println("cluster is unhealthy")
}
ls := &stats.LeaderStats{}
d := json.NewDecoder(resp.Body)
err = d.Decode(ls)
if err != nil {
continue
if !forever {
break
}
return ep, ls, nil
fmt.Printf("\nnext check after 10 second...\n\n")
time.Sleep(10 * time.Second)
}
return "", nil, errors.New("no leader")
}

View File

@ -41,6 +41,10 @@ const (
clusterStateFlagExisting = "existing"
defaultName = "default"
// maxElectionMs specifies the maximum value of election timeout.
// More details are listed in ../Documentation/tuning.md#time-parameters.
maxElectionMs = 50000
)
var (
@ -137,7 +141,6 @@ func NewConfig() *config {
fs := cfg.FlagSet
fs.Usage = func() {
fmt.Println(usageline)
fmt.Println(flagsline)
}
// member
@ -225,6 +228,7 @@ func (cfg *config) Parse(arguments []string) error {
switch perr {
case nil:
case flag.ErrHelp:
fmt.Println(flagsline)
os.Exit(0)
default:
os.Exit(2)
@ -293,6 +297,9 @@ func (cfg *config) Parse(arguments []string) error {
if 5*cfg.TickMs > cfg.ElectionMs {
return fmt.Errorf("-election-timeout[%vms] should be at least as 5 times as -heartbeat-interval[%vms]", cfg.ElectionMs, cfg.TickMs)
}
if cfg.ElectionMs > maxElectionMs {
return fmt.Errorf("-election-timeout[%vms] is too long, and should be set less than %vms", cfg.ElectionMs, maxElectionMs)
}
return nil
}

View File

@ -37,6 +37,7 @@ import (
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/proxy"
"github.com/coreos/etcd/rafthttp"
"github.com/coreos/etcd/version"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/prometheus/client_golang/prometheus"
@ -73,6 +74,11 @@ func Main() {
var stopped <-chan struct{}
plog.Infof("etcd Version: %s\n", version.Version)
plog.Infof("Git SHA: %s\n", version.GitSHA)
plog.Infof("Go Version: %s\n", runtime.Version())
plog.Infof("Go OS/Arch: %s/%s\n", runtime.GOOS, runtime.GOARCH)
GoMaxProcs := 1
if envMaxProcs, err := strconv.Atoi(os.Getenv("GOMAXPROCS")); err == nil {
GoMaxProcs = envMaxProcs
@ -93,19 +99,28 @@ func Main() {
which := identifyDataDirOrDie(cfg.dir)
if which != dirEmpty {
plog.Noticef("the server is already initialized as %v before, starting as etcd %v...", which, which)
}
shouldProxy := cfg.isProxy() || which == dirProxy
if !shouldProxy {
stopped, err = startEtcd(cfg)
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
shouldProxy = true
switch which {
case dirMember:
stopped, err = startEtcd(cfg)
case dirProxy:
err = startProxy(cfg)
default:
plog.Panicf("unhandled dir type %v", which)
}
} else {
shouldProxy := cfg.isProxy()
if !shouldProxy {
stopped, err = startEtcd(cfg)
if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
plog.Noticef("discovery cluster full, falling back to %s", fallbackFlagProxy)
shouldProxy = true
}
}
if shouldProxy {
err = startProxy(cfg)
}
}
if shouldProxy {
err = startProxy(cfg)
}
if err != nil {
switch err {
case discovery.ErrDuplicateID:
@ -142,6 +157,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
}
plns := make([]net.Listener, 0)
for _, u := range cfg.lpurls {
if u.Scheme == "http" && !cfg.peerTLSInfo.Empty() {
plog.Warningf("The scheme of peer url %s is http while peer key/cert files are presented. Ignored peer key/cert files.", u.String())
}
var l net.Listener
l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
@ -164,6 +182,9 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
}
clns := make([]net.Listener, 0)
for _, u := range cfg.lcurls {
if u.Scheme == "http" && !cfg.clientTLSInfo.Empty() {
plog.Warningf("The scheme of client url %s is http while client key/cert files are presented. Ignored client key/cert files.", u.String())
}
var l net.Listener
l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
if err != nil {
@ -240,10 +261,10 @@ func startProxy(cfg *config) error {
}
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
if err != nil {
return err
}
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
tr, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, time.Duration(cfg.proxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.proxyWriteTimeoutMs)*time.Millisecond)
if err != nil {

View File

@ -36,7 +36,7 @@ member flags:
--heartbeat-interval '100'
time (in milliseconds) of a heartbeat interval.
--election-timeout '1000'
time (in milliseconds) for an election to timeout.
time (in milliseconds) for an election to timeout. See tuning documentation for details.
--listen-peer-urls 'http://localhost:2380,http://localhost:7001'
list of URLs to listen on for peer traffic.
--listen-client-urls 'http://localhost:2379,http://localhost:4001'

View File

@ -21,12 +21,12 @@ import (
"encoding/json"
"fmt"
"path"
"reflect"
"sort"
"strings"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-semver/semver"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -421,7 +421,7 @@ func ValidateClusterAndAssignIDs(local *cluster, existing *cluster) error {
sort.Sort(MembersByPeerURLs(lms))
for i := range ems {
if !reflect.DeepEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
if !netutil.URLStringsEqual(ems[i].PeerURLs, lms[i].PeerURLs) {
return fmt.Errorf("unmatched member while checking PeerURLs")
}
lms[i].ID = ems[i].ID

View File

@ -18,9 +18,9 @@ import (
"fmt"
"net/http"
"path"
"reflect"
"sort"
"github.com/coreos/etcd/pkg/netutil"
"github.com/coreos/etcd/pkg/types"
)
@ -93,7 +93,7 @@ func (c *ServerConfig) verifyLocalMember(strict bool) error {
sort.Strings(apurls)
urls.Sort()
if strict {
if !reflect.DeepEqual(apurls, urls.StringSlice()) {
if !netutil.URLStringsEqual(apurls, urls.StringSlice()) {
return fmt.Errorf("advertise URLs of %q do not match in --initial-advertise-peer-urls %s and --initial-cluster %s", c.Name, apurls, urls.StringSlice())
}
}

View File

@ -18,32 +18,20 @@ import (
"errors"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
var (
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrIDRemoved = errors.New("etcdserver: ID removed")
ErrIDExists = errors.New("etcdserver: ID exists")
ErrIDNotFound = errors.New("etcdserver: ID not found")
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrUnknownMethod = errors.New("etcdserver: unknown method")
ErrStopped = errors.New("etcdserver: server stopped")
ErrIDRemoved = errors.New("etcdserver: ID removed")
ErrIDExists = errors.New("etcdserver: ID exists")
ErrIDNotFound = errors.New("etcdserver: ID not found")
ErrPeerURLexists = errors.New("etcdserver: peerURL exists")
ErrCanceled = errors.New("etcdserver: request cancelled")
ErrTimeout = errors.New("etcdserver: request timed out")
ErrTimeoutDueToLeaderFail = errors.New("etcdserver: request timed out, possibly due to previous leader failure")
)
func parseCtxErr(err error) error {
switch err {
case context.Canceled:
return ErrCanceled
case context.DeadlineExceeded:
return ErrTimeout
default:
return err
}
}
func isKeyNotFound(err error) bool {
e, ok := err.(*etcdErr.Error)
return ok && e.ErrorCode == etcdErr.EcodeKeyNotFound

View File

@ -127,19 +127,19 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
rr, err := parseKeyRequest(r, clockwork.NewRealClock())
if err != nil {
writeError(w, err)
writeKeyError(w, err)
return
}
// The path must be valid at this point (we've parsed the request successfully).
if !hasKeyPrefixAccess(h.sec, r, r.URL.Path[len(keysPrefix):], rr.Recursive) {
writeNoAuth(w)
writeKeyNoAuth(w)
return
}
resp, err := h.server.Do(ctx, rr)
if err != nil {
err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
writeError(w, err)
writeKeyError(w, err)
return
}
switch {
@ -153,7 +153,7 @@ func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer cancel()
handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
default:
writeError(w, errors.New("received response with no Event/Watcher!"))
writeKeyError(w, errors.New("received response with no Event/Watcher!"))
}
}
@ -549,6 +549,31 @@ func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTim
return json.NewEncoder(w).Encode(ev)
}
func writeKeyNoAuth(w http.ResponseWriter) {
e := etcdErr.NewError(etcdErr.EcodeUnauthorized, "Insufficient credentials", 0)
e.WriteTo(w)
}
// writeKeyError logs and writes the given Error to the ResponseWriter.
// If Error is not an etcdErr, the error will be converted to an etcd error.
func writeKeyError(w http.ResponseWriter, err error) {
if err == nil {
return
}
switch e := err.(type) {
case *etcdErr.Error:
e.WriteTo(w)
default:
if err == etcdserver.ErrTimeoutDueToLeaderFail {
plog.Error(err)
} else {
plog.Errorf("got unexpected response error (%v)", err)
}
ee := etcdErr.NewError(etcdErr.EcodeRaftInternal, err.Error(), 0)
ee.WriteTo(w)
}
}
func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
defer wa.Remove()
ech := wa.EventChan()

View File

@ -1406,7 +1406,7 @@ func TestBadServeKeys(t *testing.T) {
},
http.StatusInternalServerError,
`{"message":"Internal Server Error"}`,
`{"errorCode":300,"message":"Raft Internal Error","cause":"Internal Server Error","index":0}`,
},
{
// etcdserver.Server etcd error
@ -1426,7 +1426,7 @@ func TestBadServeKeys(t *testing.T) {
},
http.StatusInternalServerError,
`{"message":"Internal Server Error"}`,
`{"errorCode":300,"message":"Raft Internal Error","cause":"received response with no Event/Watcher!","index":0}`,
},
}
for i, tt := range testBadCases {

View File

@ -23,6 +23,7 @@ import (
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/auth"
"github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
)
@ -59,7 +60,11 @@ func writeError(w http.ResponseWriter, err error) {
herr := httptypes.NewHTTPError(e.HTTPStatus(), e.Error())
herr.WriteTo(w)
default:
plog.Errorf("got unexpected response error (%v)", err)
if err == etcdserver.ErrTimeoutDueToLeaderFail {
plog.Error(err)
} else {
plog.Errorf("got unexpected response error (%v)", err)
}
herr := httptypes.NewHTTPError(http.StatusInternalServerError, "Internal Server Error")
herr.WriteTo(w)
}

View File

@ -19,6 +19,7 @@ import (
"expvar"
"os"
"sort"
"sync"
"sync/atomic"
"time"
@ -79,6 +80,17 @@ type apply struct {
}
type raftNode struct {
// Cache of the latest raft index and raft term the server has seen.
// These three unit64 fields must be the first elements to keep 64-bit
// alignment for atomic access to the fields.
index uint64
term uint64
lead uint64
mu sync.Mutex
// last lead elected time
lt time.Time
raft.Node
// a chan to send out apply
@ -99,11 +111,6 @@ type raftNode struct {
// If transport is nil, server will panic.
transport rafthttp.Transporter
// Cache of the latest raft index and raft term the server has seen
index uint64
term uint64
lead uint64
stopped chan struct{}
done chan struct{}
}
@ -118,6 +125,11 @@ func (r *raftNode) run() {
r.Tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
if lead := atomic.LoadUint64(&r.lead); rd.SoftState.Lead != raft.None && lead != rd.SoftState.Lead {
r.mu.Lock()
r.lt = time.Now()
r.mu.Unlock()
}
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader {
syncC = r.s.SyncTicker
@ -175,6 +187,12 @@ func (r *raftNode) apply() chan apply {
return r.applyc
}
func (r *raftNode) leadElectedTime() time.Time {
r.mu.Lock()
defer r.mu.Unlock()
return r.lt
}
func (r *raftNode) stop() {
r.Stop()
r.transport.Stop()

View File

@ -141,11 +141,13 @@ type Server interface {
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
// r must be the first element to keep 64-bit alignment for atomic
// access to fields
r raftNode
cfg *ServerConfig
snapCount uint64
r raftNode
w wait.Wait
stop chan struct{}
done chan struct{}
@ -549,7 +551,7 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
case <-ctx.Done():
proposeFailed.Inc()
s.w.Trigger(r.ID, nil) // GC wait
return Response{}, parseCtxErr(ctx.Err())
return Response{}, s.parseProposeCtxErr(ctx.Err(), start)
case <-s.done:
return Response{}, ErrStopped
}
@ -644,6 +646,7 @@ func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
cc.ID = s.reqIDGen.Next()
ch := s.w.Register(cc.ID)
start := time.Now()
if err := s.r.ProposeConfChange(ctx, cc); err != nil {
s.w.Trigger(cc.ID, nil)
return err
@ -659,7 +662,7 @@ func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error
return nil
case <-ctx.Done():
s.w.Trigger(cc.ID, nil) // GC wait
return parseCtxErr(ctx.Err())
return s.parseProposeCtxErr(ctx.Err(), start)
case <-s.done:
return ErrStopped
}
@ -1000,3 +1003,19 @@ func (s *EtcdServer) updateClusterVersion(ver string) {
plog.Errorf("error updating cluster version (%v)", err)
}
}
func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
switch err {
case context.Canceled:
return ErrCanceled
case context.DeadlineExceeded:
curLeadElected := s.r.leadElectedTime()
prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.cfg.ElectionTicks) * time.Duration(s.cfg.TickMs) * time.Millisecond)
if start.After(prevLeadLost) && start.Before(curLeadElected) {
return ErrTimeoutDueToLeaderFail
}
return ErrTimeout
default:
return err
}
}

View File

@ -566,6 +566,7 @@ func TestDoProposalCancelled(t *testing.T) {
func TestDoProposalTimeout(t *testing.T) {
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: &nodeRecorder{}},
w: &waitRecorder{},
reqIDGen: idutil.NewGenerator(0, time.Time{}),
@ -1023,6 +1024,7 @@ func TestPublishStopped(t *testing.T) {
func TestPublishRetry(t *testing.T) {
n := &nodeRecorder{}
srv := &EtcdServer{
cfg: &ServerConfig{TickMs: 1},
r: raftNode{Node: n},
w: &waitRecorder{},
done: make(chan struct{}),

View File

@ -19,9 +19,12 @@ import (
"net"
"net/http"
"net/url"
"reflect"
"sort"
"strings"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/pkg/capnslog"
"github.com/coreos/etcd/pkg/types"
)
var (
@ -31,16 +34,25 @@ var (
resolveTCPAddr = net.ResolveTCPAddr
)
// ResolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
// ResolveTCPAddrs resolves all DNS hostnames in-place for the given set of
// url.URLs.
func ResolveTCPAddrs(urls ...[]url.URL) error {
// resolveTCPAddrs is a convenience wrapper for net.ResolveTCPAddr.
// resolveTCPAddrs return a new set of url.URLs, in which all DNS hostnames
// are resolved.
func resolveTCPAddrs(urls [][]url.URL) ([][]url.URL, error) {
newurls := make([][]url.URL, 0)
for _, us := range urls {
nus := make([]url.URL, len(us))
for i, u := range us {
nu, err := url.Parse(u.String())
if err != nil {
return nil, err
}
nus[i] = *nu
}
for i, u := range nus {
host, _, err := net.SplitHostPort(u.Host)
if err != nil {
plog.Errorf("could not parse url %s during tcp resolving", u.Host)
return err
return nil, err
}
if host == "localhost" {
continue
@ -51,13 +63,60 @@ func ResolveTCPAddrs(urls ...[]url.URL) error {
tcpAddr, err := resolveTCPAddr("tcp", u.Host)
if err != nil {
plog.Errorf("could not resolve host %s", u.Host)
return err
return nil, err
}
plog.Infof("resolving %s to %s", u.Host, tcpAddr.String())
us[i].Host = tcpAddr.String()
nus[i].Host = tcpAddr.String()
}
newurls = append(newurls, nus)
}
return newurls, nil
}
// urlsEqual checks equality of url.URLS between two arrays.
// This check pass even if an URL is in hostname and opposite is in IP address.
func urlsEqual(a []url.URL, b []url.URL) bool {
if len(a) != len(b) {
return false
}
urls, err := resolveTCPAddrs([][]url.URL{a, b})
if err != nil {
return false
}
a, b = urls[0], urls[1]
sort.Sort(types.URLs(a))
sort.Sort(types.URLs(b))
for i := range a {
if !reflect.DeepEqual(a[i], b[i]) {
return false
}
}
return nil
return true
}
func URLStringsEqual(a []string, b []string) bool {
if len(a) != len(b) {
return false
}
urlsA := make([]url.URL, 0)
for _, str := range a {
u, err := url.Parse(str)
if err != nil {
return false
}
urlsA = append(urlsA, *u)
}
urlsB := make([]url.URL, 0)
for _, str := range b {
u, err := url.Parse(str)
if err != nil {
return false
}
urlsB = append(urlsB, *u)
}
return urlsEqual(urlsA, urlsB)
}
// BasicAuth returns the username and password provided in the request's

View File

@ -124,15 +124,135 @@ func TestResolveTCPAddrs(t *testing.T) {
}
return &net.TCPAddr{IP: net.ParseIP(tt.hostMap[host]), Port: i, Zone: ""}, nil
}
err := ResolveTCPAddrs(tt.urls...)
urls, err := resolveTCPAddrs(tt.urls)
if tt.hasError {
if err == nil {
t.Errorf("expected error")
}
continue
}
if !reflect.DeepEqual(tt.urls, tt.expected) {
t.Errorf("expected: %v, got %v", tt.expected, tt.urls)
if !reflect.DeepEqual(urls, tt.expected) {
t.Errorf("expected: %v, got %v", tt.expected, urls)
}
}
}
func TestURLsEqual(t *testing.T) {
defer func() { resolveTCPAddr = net.ResolveTCPAddr }()
hostm := map[string]string{
"example.com": "10.0.10.1",
"first.com": "10.0.11.1",
"second.com": "10.0.11.2",
}
resolveTCPAddr = func(network, addr string) (*net.TCPAddr, error) {
host, port, err := net.SplitHostPort(addr)
if _, ok := hostm[host]; !ok {
return nil, errors.New("cannot resolve host.")
}
i, err := strconv.Atoi(port)
if err != nil {
return nil, err
}
return &net.TCPAddr{IP: net.ParseIP(hostm[host]), Port: i, Zone: ""}, nil
}
tests := []struct {
a []url.URL
b []url.URL
expect bool
}{
{
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
expect: true,
},
{
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
expect: true,
},
{
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: true,
},
{
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: true,
},
{
a: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: true,
},
{
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "example.com:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.10.1:2379"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "127.0.0.1:2380"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "127.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "example.com:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.0.1:2379"}, {Scheme: "http", Host: "127.0.0.1:2380"}},
expect: false,
},
{
a: []url.URL{{Scheme: "http", Host: "first.com:2379"}, {Scheme: "http", Host: "second.com:2380"}},
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
expect: true,
},
{
a: []url.URL{{Scheme: "http", Host: "second.com:2380"}, {Scheme: "http", Host: "first.com:2379"}},
b: []url.URL{{Scheme: "http", Host: "10.0.11.1:2379"}, {Scheme: "http", Host: "10.0.11.2:2380"}},
expect: true,
},
}
for _, test := range tests {
result := urlsEqual(test.a, test.b)
if result != test.expect {
t.Errorf("a:%v b:%v, expected %v but %v", test.a, test.b, test.expect, result)
}
}
}
func TestURLStringsEqual(t *testing.T) {
result := URLStringsEqual([]string{"http://127.0.0.1:8080"}, []string{"http://127.0.0.1:8080"})
if !result {
t.Errorf("unexpected result %v", result)
}
}

View File

@ -16,6 +16,7 @@ package transport
import (
"crypto/tls"
"fmt"
"net"
"time"
)
@ -28,7 +29,10 @@ func NewKeepAliveListener(addr string, scheme string, info TLSInfo) (net.Listene
return nil, err
}
if !info.Empty() && scheme == "https" {
if scheme == "https" {
if info.Empty() {
return nil, fmt.Errorf("cannot listen on TLS for %s: KeyFile and CertFile are not presented", scheme+"://"+addr)
}
cfg, err := info.ServerConfig()
if err != nil {
return nil, err

View File

@ -62,3 +62,10 @@ func TestNewKeepAliveListener(t *testing.T) {
conn.Close()
tlsln.Close()
}
func TestNewKeepAliveListenerTLSEmptyInfo(t *testing.T) {
_, err := NewListener("127.0.0.1:0", "https", TLSInfo{})
if err == nil {
t.Errorf("err = nil, want not presented error")
}
}

View File

@ -258,10 +258,10 @@ func (r *raft) sendAppend(to uint64) {
}
m.Snapshot = snapshot
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
raftLogger.Infof("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
raftLogger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
pr.becomeSnapshot(sindex)
raftLogger.Infof("%x paused sending replication messages to %x [%s]", r.id, to, pr)
raftLogger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
} else {
m.Type = pb.MsgApp
m.Index = pr.Next - 1
@ -538,7 +538,7 @@ func stepLeader(r *raft, m pb.Message) {
case pr.State == ProgressStateProbe:
pr.becomeReplicate()
case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
raftLogger.Infof("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
raftLogger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
pr.becomeProbe()
case pr.State == ProgressStateReplicate:
pr.ins.freeTo(m.Index)
@ -571,11 +571,11 @@ func stepLeader(r *raft, m pb.Message) {
}
if !m.Reject {
pr.becomeProbe()
raftLogger.Infof("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
raftLogger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
} else {
pr.snapshotFailure()
pr.becomeProbe()
raftLogger.Infof("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
raftLogger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
}
// If snapshot finish, wait for the msgAppResp from the remote node before sending
// out the next msgApp.

View File

@ -31,9 +31,13 @@ import (
// event happens between the end of the first watch command and the start
// of the second command.
type watcherHub struct {
// count must be the first element to keep 64-bit alignment for atomic
// access
count int64 // current number of watchers.
mutex sync.Mutex
watchers map[string]*list.List
count int64 // current number of watchers.
EventHistory *EventHistory
}

8
test
View File

@ -45,7 +45,13 @@ split=(${NO_RACE_TEST// / })
NO_RACE_TEST=${split[@]/#/${REPO_PATH}/}
echo "Running tests..."
go test -timeout 3m ${COVER} $@ ${TEST} --race -cpu 1,2,4
MACHINE_TYPE=$(uname -m)
if [ $MACHINE_TYPE != "armv7l" ]; then
RACE="--race"
fi
go test -timeout 3m ${COVER} $@ ${TEST} ${RACE} -cpu 1,2,4
go test -timeout 3m ${COVER} $@ ${NO_RACE_TEST} -cpu 1,2,4
if [ -n "$INTEGRATION" ]; then

View File

@ -25,7 +25,7 @@ import (
var (
// MinClusterVersion is the min cluster version this etcd binary is compatible with.
MinClusterVersion = "2.0.0"
Version = "2.1.1+git"
Version = "2.1.3+git"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"