Compare commits

...

15 Commits

Author SHA1 Message Date
Yicheng Qin
92e3895214 *: bump to v2.0.13 2015-06-25 14:10:15 -07:00
Yicheng Qin
b12a52b0fd etcdmain: fix the check in fallback-to-proxy case
advertise-client-urls has to be set if listen-client-urls is set when
fallbacking to proxy, which breaks the behavior. Loosen the check to fix
it.
2015-06-25 14:07:45 -07:00
Yicheng Qin
9fa4002787 *: bump to v2.0.12+git 2015-06-16 14:20:25 -07:00
Yicheng Qin
5686c33e4b *: bump to v2.0.12 2015-06-16 14:19:37 -07:00
Yicheng Qin
6fd2dfdebc etcdmain: fix that advertise-client-urls is required in proxy mode
etcd proxy doesn't need to set advertise-client-urls because the flag is
not used.
2015-06-16 14:18:01 -07:00
Connor and Luan Santos
896ce1668c build: default git sha to GitNotFound in case git fails 2015-06-16 14:15:48 -07:00
Ryan Bourgeois
0520b4cd24 proxy: Reuse a bytes buffer as proxy request body.
The call to transport.RoundTrip closes the request body regardless of
the value of request.Closed. This causes subsequent calls to RoundTrip
using the same request body to fail.

Fixes #2895
2015-06-16 14:13:15 -07:00
Yicheng Qin
6ee6f72c48 etcdmain: increase maxIdleConnsPerHost in proxy transport
This PR set maxIdleConnsPerHost to 128 to let proxy handle 128 concurrent
requests in long term smoothly.
If the number of concurrent requests is bigger than this value,
proxy needs to create one new connection when handling each request in
the delta, which is bad because the creation consumes resource and may
eat up your ephemeral port.
2015-06-16 14:10:58 -07:00
Xiang Li
b4dd519a63 raft: fix raft node start bug
raft node should set initial prev hard state to empty.
Or it will not send the first hard coded state to application
until the state changes again.

This commit fixs the issue. It introduce a small overhead, that
the same tate might send to application twice when restarting.
But this is fine.
2015-06-16 14:10:41 -07:00
Yicheng Qin
a98fff84e7 etcdctl/cluster_health: improve output if failed to get leader stats
When failing to get leader stats, it said 'cluster is unhealthy' before.
This is confusing when it cannot get stats because advertised client urls
are set wrong and the cluster is healthy.
2015-06-16 14:07:45 -07:00
Yicheng Qin
973cfbebda *: make dial timeout configurable
Dial timeout is set shorter because
1. etcd is supposed to work in good environment, and the new value is long
enough
2. shorter dial timeout makes dial fail faster, which is good for
performance

Conflicts:
	etcdmain/etcd.go
2015-06-16 14:06:18 -07:00
Yicheng Qin
00d1d34cf8 Merge pull request #2832 from yichengq/stream-2.1
not print unhelpful info when connecting to etcd 2.1
2015-05-27 13:36:38 -07:00
Yicheng Qin
fcf81fd6bf *: bump to v2.0.11+git 2015-05-15 13:55:13 -07:00
Yicheng Qin
5074235254 rafthttp: stop printing log when attaching stream with the same term
There is no need to print log when attaching stream with the same
term because the stream is installed back immediately.

This happens a lot when etcd 2.1 connects to etcd 2.0, so we make
the change.
2015-05-14 21:52:59 -07:00
Yicheng Qin
f59bddd74b rafthttp: not log endpoint unsupport error
The error happens a lot when running 2.0 together with 2.1, and is
totally unhelpful.
2015-05-07 14:21:15 -07:00
16 changed files with 143 additions and 27 deletions

2
build
View File

@@ -11,7 +11,7 @@ ln -s ${PWD} $GOPATH/src/${REPO_PATH}
eval $(go env)
GIT_SHA=`git rev-parse --short HEAD`
GIT_SHA=`git rev-parse --short HEAD || echo "GitNotFound"`
# Static compilation is useful when etcd is run in a container
CGO_ENABLED=0 go build -a -installsuffix cgo -ldflags "-s -X ${REPO_PATH}/version.GitSHA ${GIT_SHA}" -o bin/etcd ${REPO_PATH}

View File

@@ -46,9 +46,10 @@ func handleClusterHealth(c *cli.Context) {
}
// do we have a leader?
ep, ls0, err := getLeaderStats(tr, client.GetCluster())
cl := client.GetCluster()
ep, ls0, err := getLeaderStats(tr, cl)
if err != nil {
fmt.Println("cluster is unhealthy")
fmt.Println("cluster may be unhealthy: failed to connect", cl)
os.Exit(1)
}

View File

@@ -258,8 +258,15 @@ func (cfg *config) Parse(arguments []string) error {
if err != nil {
return err
}
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
return errUnsetAdvertiseClientURLsFlag
// when etcd runs in member mode user needs to set -advertise-client-urls if -listen-client-urls is set.
// TODO(yichengq): check this for joining through discovery service case
mayFallbackToProxy := flags.IsSet(cfg.FlagSet, "discovery") && cfg.fallback.String() == fallbackFlagProxy
mayBeProxy := cfg.proxy.String() != proxyFlagOff || mayFallbackToProxy
if !mayBeProxy {
if flags.IsSet(cfg.FlagSet, "listen-client-urls") && !flags.IsSet(cfg.FlagSet, "advertise-client-urls") {
return errUnsetAdvertiseClientURLsFlag
}
}
if 5*cfg.TickMs > cfg.ElectionMs {

View File

@@ -212,6 +212,71 @@ func TestConfigParsingConflictClusteringFlags(t *testing.T) {
}
}
func TestConfigParsingMissedAdvertiseClientURLsFlag(t *testing.T) {
tests := []struct {
args []string
werr error
}{
{
[]string{
"-initial-cluster=infra1=http://127.0.0.1:2380",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery-srv=example.com",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery=http://example.com/abc",
"-discovery-fallback=exit",
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-listen-client-urls=http://127.0.0.1:2379",
},
errUnsetAdvertiseClientURLsFlag,
},
{
[]string{
"-discovery=http://example.com/abc",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
{
[]string{
"-proxy=on",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
{
[]string{
"-proxy=readonly",
"-listen-client-urls=http://127.0.0.1:2379",
},
nil,
},
}
for i, tt := range tests {
cfg := NewConfig()
err := cfg.Parse(tt.args)
if err != tt.werr {
t.Errorf("%d: err = %v, want %v", i, err, tt.werr)
}
}
}
func TestConfigIsNewCluster(t *testing.T) {
tests := []struct {
state string

View File

@@ -116,7 +116,7 @@ func startEtcd(cfg *config) (<-chan struct{}, error) {
return nil, fmt.Errorf("error setting up initial cluster: %v", err)
}
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
return nil, err
}
@@ -233,6 +233,7 @@ func startProxy(cfg *config) error {
}
pt, err := transport.NewTransport(cfg.clientTLSInfo)
pt.MaxIdleConnsPerHost = proxy.DefaultMaxIdleConnsPerHost
if err != nil {
return err
}

View File

@@ -663,7 +663,7 @@ func mustNewHTTPClient(t *testing.T, eps []string) client.HTTPClient {
}
func mustNewTransport(t *testing.T) *http.Transport {
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
tr, err := transport.NewTimeoutTransport(transport.TLSInfo{}, rafthttp.DialTimeout, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
if err != nil {
t.Fatal(err)
}

View File

@@ -23,7 +23,7 @@ import (
// NewTimeoutTransport returns a transport created using the given TLS info.
// If read/write on the created connection blocks longer than its time limit,
// it will return timeout error.
func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
func NewTimeoutTransport(info TLSInfo, dialtimeoutd, rdtimeoutd, wtimeoutd time.Duration) (*http.Transport, error) {
tr, err := NewTransport(info)
if err != nil {
return nil, err
@@ -33,7 +33,7 @@ func NewTimeoutTransport(info TLSInfo, rdtimeoutd, wtimeoutd time.Duration) (*ht
tr.MaxIdleConnsPerHost = -1
tr.Dial = (&rwTimeoutDialer{
Dialer: net.Dialer{
Timeout: 30 * time.Second,
Timeout: dialtimeoutd,
KeepAlive: 30 * time.Second,
},
rdtimeoutd: rdtimeoutd,

View File

@@ -26,7 +26,7 @@ import (
// TestNewTimeoutTransport tests that NewTimeoutTransport returns a transport
// that can dial out timeout connections.
func TestNewTimeoutTransport(t *testing.T) {
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour)
tr, err := NewTimeoutTransport(TLSInfo{}, time.Hour, time.Hour, time.Hour)
if err != nil {
t.Fatalf("unexpected NewTimeoutTransport error: %v", err)
}

View File

@@ -18,6 +18,17 @@ import (
"net/http"
)
const (
// DefaultMaxIdleConnsPerHost indicates the default maximal idle connections
// maintained between proxy and each member. We set it to 128 to
// let proxy handle 128 concurrent requests in long term smoothly.
// If the number of concurrent requests is bigger than this value,
// proxy needs to create one new connection when handling each request in
// the delta, which is bad because the creation consumes resource and
// may eat up ephemeral ports.
DefaultMaxIdleConnsPerHost = 128
)
// GetProxyURLs is a function which should return the current set of URLs to
// which client requests should be proxied. This function will be queried
// periodically by the proxy Handler to refresh the set of available

View File

@@ -15,8 +15,10 @@
package proxy
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@@ -55,6 +57,21 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
proxyreq := new(http.Request)
*proxyreq = *clientreq
var (
proxybody []byte
err error
)
if clientreq.Body != nil {
proxybody, err = ioutil.ReadAll(clientreq.Body)
if err != nil {
msg := fmt.Sprintf("proxy: failed to read request body: %v", err)
e := httptypes.NewHTTPError(http.StatusInternalServerError, msg)
e.WriteTo(rw)
return
}
}
// deep-copy the headers, as these will be modified below
proxyreq.Header = make(http.Header)
copyHeader(proxyreq.Header, clientreq.Header)
@@ -93,9 +110,11 @@ func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request
}
var res *http.Response
var err error
for _, ep := range endpoints {
if proxybody != nil {
proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
}
redirectRequest(proxyreq, ep.URL)
res, err = p.transport.RoundTrip(proxyreq)

View File

@@ -233,7 +233,7 @@ func (n *node) run(r *raft) {
lead := None
prevSoftSt := r.softState()
prevHardSt := r.HardState
prevHardSt := emptyState
for {
if advancec != nil {

View File

@@ -354,7 +354,7 @@ func TestNodeRestart(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 1}
want := Ready{
HardState: emptyState,
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries[:st.Commit],
}
@@ -389,7 +389,7 @@ func TestNodeRestartFromSnapshot(t *testing.T) {
st := raftpb.HardState{Term: 1, Commit: 3}
want := Ready{
HardState: emptyState,
HardState: st,
// commit up to index commit index in st
CommittedEntries: entries,
}

View File

@@ -121,7 +121,6 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fromStr := strings.TrimPrefix(r.URL.Path, RaftStreamPrefix+"/")
from, err := types.IDFromString(fromStr)
if err != nil {
log.Printf("rafthttp: path %s cannot be parsed", fromStr)
http.Error(w, "invalid path", http.StatusNotFound)
return
}

View File

@@ -40,6 +40,7 @@ const (
appRespBatchMs = 50
propBatchMs = 10
DialTimeout = time.Second
ConnReadTimeout = 5 * time.Second
ConnWriteTimeout = 5 * time.Second
)

View File

@@ -76,8 +76,11 @@ func (s *stream) attach(sw *streamWriter) error {
// ignore lower-term streaming request
if sw.term < s.w.term {
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
} else if sw.term == s.w.term {
s.w.stopWithoutLog()
} else {
s.w.stop()
}
s.w.stop()
}
s.w = sw
return nil
@@ -151,21 +154,23 @@ type WriteFlusher interface {
// TODO: replace fs with stream stats
type streamWriter struct {
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
printLog bool
}
// newStreamWriter starts and returns a new unstarted stream writer.
// The caller should call stop when finished, to shut it down.
func newStreamWriter(to types.ID, term uint64) *streamWriter {
s := &streamWriter{
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
printLog: true,
}
return s
}
@@ -188,7 +193,9 @@ func (s *streamWriter) send(ents []raftpb.Entry) error {
func (s *streamWriter) handle(w WriteFlusher) {
defer func() {
close(s.done)
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
if s.printLog {
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
}
}()
ew := newEntryWriter(w, s.to)
@@ -215,6 +222,11 @@ func (s *streamWriter) stop() {
<-s.done
}
func (s *streamWriter) stopWithoutLog() {
s.printLog = false
s.stop()
}
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
// TODO: move the raft interface out of the reader.

View File

@@ -23,7 +23,7 @@ import (
)
var (
Version = "2.0.11"
Version = "2.0.13"
// Git SHA Value will be set during build
GitSHA = "Not provided (use ./build instead of go build)"