chore(peer_server): set client transporter separately
It also moves the hack on timeout from raft transporter to client transporter.release-0.4
parent
001b1fcd46
commit
c9ce14c857
36
etcd/etcd.go
36
etcd/etcd.go
|
@ -28,6 +28,7 @@ import (
|
|||
goetcd "github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
|
||||
golog "github.com/coreos/etcd/third_party/github.com/coreos/go-log/log"
|
||||
"github.com/coreos/etcd/third_party/github.com/goraft/raft"
|
||||
httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
|
||||
|
||||
"github.com/coreos/etcd/config"
|
||||
ehttp "github.com/coreos/etcd/http"
|
||||
|
@ -37,6 +38,8 @@ import (
|
|||
"github.com/coreos/etcd/store"
|
||||
)
|
||||
|
||||
const extraTimeout = 1000
|
||||
|
||||
type Etcd struct {
|
||||
Config *config.Config // etcd config
|
||||
Store store.Store // data store
|
||||
|
@ -134,14 +137,33 @@ func (e *Etcd) Run() {
|
|||
// Calculate all of our timeouts
|
||||
heartbeatInterval := time.Duration(e.Config.Peer.HeartbeatInterval) * time.Millisecond
|
||||
electionTimeout := time.Duration(e.Config.Peer.ElectionTimeout) * time.Millisecond
|
||||
// TODO(yichengq): constant 1000 is a hack here. The reason to use this
|
||||
// is to ensure etcd instances could start successfully at the same time.
|
||||
// Current problem for the failure comes from the lag between join command
|
||||
dialTimeout := (3 * heartbeatInterval) + electionTimeout
|
||||
responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout
|
||||
|
||||
// TODO(yichengq): constant extraTimeout is a hack here.
|
||||
// Current problem is that there is big lag between join command
|
||||
// execution and join success.
|
||||
// Fix it later. It should be removed when proper method is found and
|
||||
// enough tests are provided.
|
||||
dialTimeout := (3 * heartbeatInterval) + electionTimeout + 1000
|
||||
responseHeaderTimeout := (3 * heartbeatInterval) + electionTimeout + 1000
|
||||
// enough tests are provided. It is expected to be calculated from
|
||||
// heartbeatInterval and electionTimeout only.
|
||||
clientTransporter := &httpclient.Transport{
|
||||
ResponseHeaderTimeout: responseHeaderTimeout + extraTimeout,
|
||||
// This is a workaround for Transport.CancelRequest doesn't work on
|
||||
// HTTPS connections blocked. The patch for it is in progress,
|
||||
// and would be available in Go1.3
|
||||
// More: https://codereview.appspot.com/69280043/
|
||||
ConnectTimeout: dialTimeout + extraTimeout,
|
||||
RequestTimeout: responseHeaderTimeout + dialTimeout + 2*extraTimeout,
|
||||
}
|
||||
if e.Config.PeerTLSInfo().Scheme() == "https" {
|
||||
clientTLSConfig, err := e.Config.PeerTLSInfo().ClientConfig()
|
||||
if err != nil {
|
||||
log.Fatal("client TLS error: ", err)
|
||||
}
|
||||
clientTransporter.TLSClientConfig = clientTLSConfig
|
||||
clientTransporter.DisableCompression = true
|
||||
}
|
||||
client := server.NewClient(clientTransporter)
|
||||
|
||||
// Create peer server
|
||||
psConfig := server.PeerServerConfig{
|
||||
|
@ -152,7 +174,7 @@ func (e *Etcd) Run() {
|
|||
RetryTimes: e.Config.MaxRetryAttempts,
|
||||
RetryInterval: e.Config.RetryInterval,
|
||||
}
|
||||
e.PeerServer = server.NewPeerServer(psConfig, e.Registry, e.Store, &mb, followersStats, serverStats)
|
||||
e.PeerServer = server.NewPeerServer(psConfig, client, e.Registry, e.Store, &mb, followersStats, serverStats)
|
||||
|
||||
// Create raft transporter and server
|
||||
raftTransporter := server.NewTransporter(followersStats, serverStats, e.Registry, heartbeatInterval, dialTimeout, responseHeaderTimeout)
|
||||
|
|
|
@ -80,9 +80,10 @@ type snapshotConf struct {
|
|||
snapshotThr uint64
|
||||
}
|
||||
|
||||
func NewPeerServer(psConfig PeerServerConfig, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
|
||||
func NewPeerServer(psConfig PeerServerConfig, client *Client, registry *Registry, store store.Store, mb *metrics.Bucket, followersStats *raftFollowersStats, serverStats *raftServerStats) *PeerServer {
|
||||
s := &PeerServer{
|
||||
Config: psConfig,
|
||||
client: client,
|
||||
clusterConfig: NewClusterConfig(),
|
||||
registry: registry,
|
||||
store: store,
|
||||
|
@ -247,11 +248,6 @@ func (s *PeerServer) Start(snapshot bool, discoverURL string, peers []string) er
|
|||
}
|
||||
}
|
||||
|
||||
// TODO(yichengq): client for HTTP API usage could use transport other
|
||||
// than the raft one. The transport should have longer timeout because
|
||||
// it doesn't have fault tolerance of raft protocol.
|
||||
s.client = NewClient(s.raftServer.Transporter().(*transporter).transport)
|
||||
|
||||
s.raftServer.Init()
|
||||
|
||||
// Set NOCOW for data directory in btrfs
|
||||
|
|
Loading…
Reference in New Issue