Compare commits
15 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
92e3895214 | ||
![]() |
b12a52b0fd | ||
![]() |
9fa4002787 | ||
![]() |
5686c33e4b | ||
![]() |
6fd2dfdebc | ||
![]() |
896ce1668c | ||
![]() |
0520b4cd24 | ||
![]() |
6ee6f72c48 | ||
![]() |
b4dd519a63 | ||
![]() |
a98fff84e7 | ||
![]() |
973cfbebda | ||
![]() |
00d1d34cf8 | ||
![]() |
fcf81fd6bf | ||
![]() |
5074235254 | ||
![]() |
f59bddd74b |
2
build
2
build
@@ -11,7 +11,7 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
|
||||
|
||||
eval $(go env)
|
||||
|
||||
GIT_SHA=`git rev-parse --short HEAD`
|
||||
GIT_SHA=`git rev-parse --short HEAD || echo "GitNotFound"`
|
||||
|
||||
# Static compilation is useful when etcd is run in a container
|
||||
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}
|
||||
|
@@ -46,9 +46,10 @@ func handleClusterHealth(c *cli.Context) {
|
||||
}
|
||||
|
||||
// do we have a leader?
|
||||
ep, ls0, err := getLeaderStats(tr, client.GetCluster())
|
||||
cl := client.GetCluster()
|
||||
ep, ls0, err := getLeaderStats(tr, cl)
|
||||
if err != nil {
|
||||
fmt.Println("cluster is unhealthy")
|
||||
fmt.Println("cluster may be unhealthy: failed to connect", cl)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
@@ -258,8 +258,15 @@ func (cfg *config) Parse(arguments []string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
|
||||
return errUnsetAdvertiseClientURLsFlag
|
||||
|
||||
// when etcd runs in member mode user needs to set -advertise-client-urls if -listen-client-urls is set.
|
||||
// TODO(yichengq): check this for joining through discovery service case
|
||||
mayFallbackToProxy := flags.IsSet(cfg.FlagSet, "discovery") && cfg.fallback.String() == fallbackFlagProxy
|
||||
mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy
|
||||
if !mayBeProxy {
|
||||
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
|
||||
return errUnsetAdvertiseClientURLsFlag
|
||||
}
|
||||
}
|
||||
|
||||
if 5*cfg.TickMs > cfg.ElectionMs {
|
||||
|
@@ -212,6 +212,71 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
|
||||
tests := []struct {
|
||||
args []string
|
||||
werr error
|
||||
}{
|
||||
{
|
||||
[]string{
|
||||
"-initial-cluster=infra1=http://127.0.0.1:2380",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-discovery-srv=example.com",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-discovery=http://example.com/abc",
|
||||
"-discovery-fallback=exit",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
errUnsetAdvertiseClientURLsFlag,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-discovery=http://example.com/abc",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-proxy=on",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]string{
|
||||
"-proxy=readonly",
|
||||
"-listen-client-urls=http://127.0.0.1:2379",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
cfg := NewConfig()
|
||||
err := cfg.Parse(tt.args)
|
||||
if err != tt.werr {
|
||||
t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigIsNewCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
state string
|
||||
|
@@ -116,7 +116,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
|
||||
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
|
||||
}
|
||||
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -233,6 +233,7 @@ func startProxy(cfg *config) error {
|
||||
}
|
||||
|
||||
pt, err := transport.NewTransport(cfg.clientTLSInfo)
|
||||
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -663,7 +663,7 @@ func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
|
||||
}
|
||||
|
||||
func mustNewTransport(t *testing.T) *http.Transport {
|
||||
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@@ -23,7 +23,7 @@ import (
|
||||
// NewTimeoutTransport returns a transport created using the given TLS info.
|
||||
// If read/write on the created connection blocks longer than its time limit,
|
||||
// it will return timeout error.
|
||||
func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
|
||||
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
|
||||
tr, err := NewTransport(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -33,7 +33,7 @@ func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*ht
|
||||
tr.MaxIdleConnsPerHost = -1
|
||||
tr.Dial = (&rwTimeoutDialer{
|
||||
Dialer: net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
Timeout: dialtimeoutd,
|
||||
KeepAlive: 30 * time.Second,
|
||||
},
|
||||
rdtimeoutd: rdtimeoutd,
|
||||
|
@@ -26,7 +26,7 @@ import (
|
||||
// TestNewTimeoutTransport tests that NewTimeoutTransport returns a transport
|
||||
// that can dial out timeout connections.
|
||||
func TestNewTimeoutTransport(t *testing.T) {
|
||||
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour)
|
||||
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour, time.Hour)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected NewTimeoutTransport error: %v", err)
|
||||
}
|
||||
|
@@ -18,6 +18,17 @@ import (
|
||||
"net/http"
|
||||
)
|
||||
|
||||
const (
|
||||
// DefaultMaxIdleConnsPerHost indicates the default maximal idle connections
|
||||
// maintained between proxy and each member. We set it to 128 to
|
||||
// let proxy handle 128 concurrent requests in long term smoothly.
|
||||
// If the number of concurrent requests is bigger than this value,
|
||||
// proxy needs to create one new connection when handling each request in
|
||||
// the delta, which is bad because the creation consumes resource and
|
||||
// may eat up ephemeral ports.
|
||||
DefaultMaxIdleConnsPerHost = 128
|
||||
)
|
||||
|
||||
// GetProxyURLs is a function which should return the current set of URLs to
|
||||
// which client requests should be proxied. This function will be queried
|
||||
// periodically by the proxy Handler to refresh the set of available
|
||||
|
@@ -15,8 +15,10 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -55,6 +57,21 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
proxyreq := new(http.Request)
|
||||
*proxyreq = *clientreq
|
||||
|
||||
var (
|
||||
proxybody []byte
|
||||
err error
|
||||
)
|
||||
|
||||
if clientreq.Body != nil {
|
||||
proxybody, err = ioutil.ReadAll(clientreq.Body)
|
||||
if err != nil {
|
||||
msg := fmt.Sprintf("proxy: failed to read request body: %v", err)
|
||||
e := httptypes.NewHTTPError(http.StatusInternalServerError, msg)
|
||||
e.WriteTo(rw)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// deep-copy the headers, as these will be modified below
|
||||
proxyreq.Header = make(http.Header)
|
||||
copyHeader(proxyreq.Header, clientreq.Header)
|
||||
@@ -93,9 +110,11 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
|
||||
}
|
||||
|
||||
var res *http.Response
|
||||
var err error
|
||||
|
||||
for _, ep := range endpoints {
|
||||
if proxybody != nil {
|
||||
proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
|
||||
}
|
||||
redirectRequest(proxyreq, ep.URL)
|
||||
|
||||
res, err = p.transport.RoundTrip(proxyreq)
|
||||
|
@@ -233,7 +233,7 @@ func (n *node) run(r *raft) {
|
||||
|
||||
lead := None
|
||||
prevSoftSt := r.softState()
|
||||
prevHardSt := r.HardState
|
||||
prevHardSt := emptyState
|
||||
|
||||
for {
|
||||
if advancec != nil {
|
||||
|
@@ -354,7 +354,7 @@ func TestNodeRestart(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 1}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
HardState: st,
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries[:st.Commit],
|
||||
}
|
||||
@@ -389,7 +389,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
|
||||
st := raftpb.HardState{Term: 1, Commit: 3}
|
||||
|
||||
want := Ready{
|
||||
HardState: emptyState,
|
||||
HardState: st,
|
||||
// commit up to index commit index in st
|
||||
CommittedEntries: entries,
|
||||
}
|
||||
|
@@ -121,7 +121,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
|
||||
from, err := types.IDFromString(fromStr)
|
||||
if err != nil {
|
||||
log.Printf("rafthttp: path %s cannot be parsed", fromStr)
|
||||
http.Error(w, "invalid path", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
@@ -40,6 +40,7 @@ const (
|
||||
appRespBatchMs = 50
|
||||
propBatchMs = 10
|
||||
|
||||
DialTimeout = time.Second
|
||||
ConnReadTimeout = 5 * time.Second
|
||||
ConnWriteTimeout = 5 * time.Second
|
||||
)
|
||||
|
@@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error {
|
||||
// ignore lower-term streaming request
|
||||
if sw.term < s.w.term {
|
||||
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
|
||||
} else if sw.term == s.w.term {
|
||||
s.w.stopWithoutLog()
|
||||
} else {
|
||||
s.w.stop()
|
||||
}
|
||||
s.w.stop()
|
||||
}
|
||||
s.w = sw
|
||||
return nil
|
||||
@@ -151,21 +154,23 @@ type WriteFlusher interface {
|
||||
|
||||
// TODO: replace fs with stream stats
|
||||
type streamWriter struct {
|
||||
to types.ID
|
||||
term uint64
|
||||
fs *stats.FollowerStats
|
||||
q chan []raftpb.Entry
|
||||
done chan struct{}
|
||||
to types.ID
|
||||
term uint64
|
||||
fs *stats.FollowerStats
|
||||
q chan []raftpb.Entry
|
||||
done chan struct{}
|
||||
printLog bool
|
||||
}
|
||||
|
||||
// newStreamWriter starts and returns a new unstarted stream writer.
|
||||
// The caller should call stop when finished, to shut it down.
|
||||
func newStreamWriter(to types.ID, term uint64) *streamWriter {
|
||||
s := &streamWriter{
|
||||
to: to,
|
||||
term: term,
|
||||
q: make(chan []raftpb.Entry, streamBufSize),
|
||||
done: make(chan struct{}),
|
||||
to: to,
|
||||
term: term,
|
||||
q: make(chan []raftpb.Entry, streamBufSize),
|
||||
done: make(chan struct{}),
|
||||
printLog: true,
|
||||
}
|
||||
return s
|
||||
}
|
||||
@@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
|
||||
func (s *streamWriter) handle(w WriteFlusher) {
|
||||
defer func() {
|
||||
close(s.done)
|
||||
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
||||
if s.printLog {
|
||||
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
|
||||
}
|
||||
}()
|
||||
|
||||
ew := newEntryWriter(w, s.to)
|
||||
@@ -215,6 +222,11 @@ func (s *streamWriter) stop() {
|
||||
<-s.done
|
||||
}
|
||||
|
||||
func (s *streamWriter) stopWithoutLog() {
|
||||
s.printLog = false
|
||||
s.stop()
|
||||
}
|
||||
|
||||
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
|
||||
|
||||
// TODO: move the raft interface out of the reader.
|
||||
|
@@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
Version = "2.0.11"
|
||||
Version = "2.0.13"
|
||||
|
||||
// Git SHA Value will be set during build
|
||||
GitSHA = "Not provided (use ./build instead of go build)"
|
||||
|
Reference in New Issue
Block a user