rafthttp: rename to to peerID

release-3.0
Xiang Li 2016-06-01 22:12:47 -07:00
parent c25c00fcf9
commit a047aa4a81
6 changed files with 51 additions and 52 deletions

View File

@ -117,16 +117,16 @@ type peer struct {
stopc chan struct{} stopc chan struct{}
} }
func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.FollowerStats) *peer { func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
plog.Infof("starting peer %s...", id) plog.Infof("starting peer %s...", peerID)
defer plog.Infof("started peer %s", id) defer plog.Infof("started peer %s", peerID)
status := newPeerStatus(id) status := newPeerStatus(peerID)
picker := newURLPicker(urls) picker := newURLPicker(urls)
errorc := transport.ErrorC errorc := transport.ErrorC
r := transport.Raft r := transport.Raft
pipeline := &pipeline{ pipeline := &pipeline{
to: id, peerID: peerID,
tr: transport, tr: transport,
picker: picker, picker: picker,
status: status, status: status,
@ -137,14 +137,14 @@ func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.Fol
pipeline.start() pipeline.start()
p := &peer{ p := &peer{
id: id, id: peerID,
r: r, r: r,
status: status, status: status,
picker: picker, picker: picker,
msgAppV2Writer: startStreamWriter(id, status, fs, r), msgAppV2Writer: startStreamWriter(peerID, status, fs, r),
writer: startStreamWriter(id, status, fs, r), writer: startStreamWriter(peerID, status, fs, r),
pipeline: pipeline, pipeline: pipeline,
snapSender: newSnapshotSender(transport, picker, id, status), snapSender: newSnapshotSender(transport, picker, peerID, status),
sendc: make(chan raftpb.Message), sendc: make(chan raftpb.Message),
recvc: make(chan raftpb.Message, recvBufSize), recvc: make(chan raftpb.Message, recvBufSize),
propc: make(chan raftpb.Message, maxPendingProposals), propc: make(chan raftpb.Message, maxPendingProposals),
@ -183,19 +183,19 @@ func startPeer(transport *Transport, urls types.URLs, id types.ID, fs *stats.Fol
}() }()
p.msgAppV2Reader = &streamReader{ p.msgAppV2Reader = &streamReader{
peerID: peerID,
typ: streamTypeMsgAppV2, typ: streamTypeMsgAppV2,
tr: transport, tr: transport,
picker: picker, picker: picker,
to: id,
status: status, status: status,
recvc: p.recvc, recvc: p.recvc,
propc: p.propc, propc: p.propc,
} }
p.msgAppReader = &streamReader{ p.msgAppReader = &streamReader{
peerID: peerID,
typ: streamTypeMessage, typ: streamTypeMessage,
tr: transport, tr: transport,
picker: picker, picker: picker,
to: id,
status: status, status: status,
recvc: p.recvc, recvc: p.recvc,
propc: p.propc, propc: p.propc,

View File

@ -41,7 +41,7 @@ const (
var errStopped = errors.New("stopped") var errStopped = errors.New("stopped")
type pipeline struct { type pipeline struct {
to types.ID peerID types.ID
tr *Transport tr *Transport
picker *urlPicker picker *urlPicker
@ -64,13 +64,13 @@ func (p *pipeline) start() {
for i := 0; i < connPerPipeline; i++ { for i := 0; i < connPerPipeline; i++ {
go p.handle() go p.handle()
} }
plog.Infof("started HTTP pipelining with peer %s", p.to) plog.Infof("started HTTP pipelining with peer %s", p.peerID)
} }
func (p *pipeline) stop() { func (p *pipeline) stop() {
close(p.stopc) close(p.stopc)
p.wg.Wait() p.wg.Wait()
plog.Infof("stopped HTTP pipelining with peer %s", p.to) plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
} }
func (p *pipeline) handle() { func (p *pipeline) handle() {
@ -140,7 +140,7 @@ func (p *pipeline) post(data []byte) (err error) {
} }
resp.Body.Close() resp.Body.Close()
err = checkPostResponse(resp, b, req, p.to) err = checkPostResponse(resp, b, req, p.peerID)
if err != nil { if err != nil {
p.picker.unreachable(u) p.picker.unreachable(u)
// errMemberRemoved is a critical error since a removed member should // errMemberRemoved is a critical error since a removed member should

View File

@ -301,10 +301,9 @@ func (n *nopReadCloser) Close() error { return nil }
func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline { func startTestPipeline(tr *Transport, picker *urlPicker) *pipeline {
p := &pipeline{ p := &pipeline{
tr: tr, peerID: types.ID(1),
picker: picker, tr: tr,
picker: picker,
to: types.ID(1),
status: newPeerStatus(types.ID(1)), status: newPeerStatus(types.ID(1)),
raft: &fakeRaft{}, raft: &fakeRaft{},
followerStats: &stats.FollowerStats{}, followerStats: &stats.FollowerStats{},

View File

@ -25,11 +25,11 @@ type remote struct {
pipeline *pipeline pipeline *pipeline
} }
func startRemote(tr *Transport, urls types.URLs, to types.ID) *remote { func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
picker := newURLPicker(urls) picker := newURLPicker(urls)
status := newPeerStatus(to) status := newPeerStatus(id)
pipeline := &pipeline{ pipeline := &pipeline{
to: to, peerID: id,
tr: tr, tr: tr,
picker: picker, picker: picker,
status: status, status: status,
@ -39,7 +39,7 @@ func startRemote(tr *Transport, urls types.URLs, to types.ID) *remote {
pipeline.start() pipeline.start()
return &remote{ return &remote{
id: to, id: id,
status: status, status: status,
pipeline: pipeline, pipeline: pipeline,
} }

View File

@ -97,7 +97,7 @@ type outgoingConn struct {
// streamWriter writes messages to the attached outgoingConn. // streamWriter writes messages to the attached outgoingConn.
type streamWriter struct { type streamWriter struct {
id types.ID peerID types.ID
status *peerStatus status *peerStatus
fs *stats.FollowerStats fs *stats.FollowerStats
r Raft r Raft
@ -116,7 +116,7 @@ type streamWriter struct {
// messages and writes to the attached outgoing connection. // messages and writes to the attached outgoing connection.
func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter { func startStreamWriter(id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
w := &streamWriter{ w := &streamWriter{
id: id, peerID: id,
status: status, status: status,
fs: fs, fs: fs,
r: r, r: r,
@ -141,7 +141,7 @@ func (cw *streamWriter) run() {
tickc := time.Tick(ConnReadTimeout / 3) tickc := time.Tick(ConnReadTimeout / 3)
unflushed := 0 unflushed := 0
plog.Infof("started streaming with peer %s (writer)", cw.id) plog.Infof("started streaming with peer %s (writer)", cw.peerID)
for { for {
select { select {
@ -151,7 +151,7 @@ func (cw *streamWriter) run() {
if err == nil { if err == nil {
flusher.Flush() flusher.Flush()
batched = 0 batched = 0
sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed)) sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0 unflushed = 0
continue continue
} }
@ -159,7 +159,7 @@ func (cw *streamWriter) run() {
cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error()) cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
cw.close() cw.close()
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t) plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
heartbeatc, msgc = nil, nil heartbeatc, msgc = nil, nil
case m := <-msgc: case m := <-msgc:
@ -169,7 +169,7 @@ func (cw *streamWriter) run() {
if len(msgc) == 0 || batched > streamBufSize/2 { if len(msgc) == 0 || batched > streamBufSize/2 {
flusher.Flush() flusher.Flush()
sentBytes.WithLabelValues(cw.id.String()).Add(float64(unflushed)) sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
unflushed = 0 unflushed = 0
batched = 0 batched = 0
} else { } else {
@ -181,13 +181,13 @@ func (cw *streamWriter) run() {
cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error()) cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
cw.close() cw.close()
plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.id, t) plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
heartbeatc, msgc = nil, nil heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To) cw.r.ReportUnreachable(m.To)
case conn := <-cw.connc: case conn := <-cw.connc:
if cw.close() { if cw.close() {
plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.id, t) plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
} }
t = conn.t t = conn.t
switch conn.t { switch conn.t {
@ -205,14 +205,14 @@ func (cw *streamWriter) run() {
cw.closer = conn.Closer cw.closer = conn.Closer
cw.working = true cw.working = true
cw.mu.Unlock() cw.mu.Unlock()
plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.id, t) plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
heartbeatc, msgc = tickc, cw.msgc heartbeatc, msgc = tickc, cw.msgc
case <-cw.stopc: case <-cw.stopc:
if cw.close() { if cw.close() {
plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.id, t) plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
} }
close(cw.done) close(cw.done)
plog.Infof("stopped streaming with peer %s (writer)", cw.id) plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
return return
} }
} }
@ -232,7 +232,7 @@ func (cw *streamWriter) close() bool {
} }
cw.closer.Close() cw.closer.Close()
if len(cw.msgc) > 0 { if len(cw.msgc) > 0 {
cw.r.ReportUnreachable(uint64(cw.id)) cw.r.ReportUnreachable(uint64(cw.peerID))
} }
cw.msgc = make(chan raftpb.Message, streamBufSize) cw.msgc = make(chan raftpb.Message, streamBufSize)
cw.working = false cw.working = false
@ -256,11 +256,11 @@ func (cw *streamWriter) stop() {
// streamReader is a long-running go-routine that dials to the remote stream // streamReader is a long-running go-routine that dials to the remote stream
// endpoint and reads messages from the response body returned. // endpoint and reads messages from the response body returned.
type streamReader struct { type streamReader struct {
typ streamType peerID types.ID
typ streamType
tr *Transport tr *Transport
picker *urlPicker picker *urlPicker
to types.ID
status *peerStatus status *peerStatus
recvc chan<- raftpb.Message recvc chan<- raftpb.Message
propc chan<- raftpb.Message propc chan<- raftpb.Message
@ -288,7 +288,7 @@ func (r *streamReader) start() {
func (cr *streamReader) run() { func (cr *streamReader) run() {
t := cr.typ t := cr.typ
plog.Infof("started streaming with peer %s (%s reader)", cr.to, t) plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
for { for {
rc, err := cr.dial(t) rc, err := cr.dial(t)
if err != nil { if err != nil {
@ -297,9 +297,9 @@ func (cr *streamReader) run() {
} }
} else { } else {
cr.status.activate() cr.status.activate()
plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ) plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
err := cr.decodeLoop(rc, t) err := cr.decodeLoop(rc, t)
plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.to, cr.typ) plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
switch { switch {
// all data is read out // all data is read out
case err == io.EOF: case err == io.EOF:
@ -315,7 +315,7 @@ func (cr *streamReader) run() {
case <-time.After(100 * time.Millisecond): case <-time.After(100 * time.Millisecond):
case <-cr.stopc: case <-cr.stopc:
close(cr.done) close(cr.done)
plog.Infof("stopped streaming with peer %s (%s reader)", cr.to, t) plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
return return
} }
} }
@ -326,7 +326,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
cr.mu.Lock() cr.mu.Lock()
switch t { switch t {
case streamTypeMsgAppV2: case streamTypeMsgAppV2:
dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.to) dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
case streamTypeMessage: case streamTypeMessage:
dec = &messageDecoder{r: rc} dec = &messageDecoder{r: rc}
default: default:
@ -402,7 +402,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
req.Header.Set("X-Server-Version", version.Version) req.Header.Set("X-Server-Version", version.Version)
req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion) req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String()) req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
req.Header.Set("X-Raft-To", cr.to.String()) req.Header.Set("X-Raft-To", cr.peerID.String())
setPeerURLsHeader(req, cr.tr.URLs) setPeerURLsHeader(req, cr.tr.URLs)
@ -445,7 +445,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
case http.StatusNotFound: case http.StatusNotFound:
httputil.GracefulClose(resp) httputil.GracefulClose(resp)
cr.picker.unreachable(u) cr.picker.unreachable(u)
return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to) return nil, fmt.Errorf("peer %s faild to fine local node %s", cr.peerID, cr.tr.ID)
case http.StatusPreconditionFailed: case http.StatusPreconditionFailed:
b, err := ioutil.ReadAll(resp.Body) b, err := ioutil.ReadAll(resp.Body)
if err != nil { if err != nil {
@ -457,11 +457,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
switch strings.TrimSuffix(string(b), "\n") { switch strings.TrimSuffix(string(b), "\n") {
case errIncompatibleVersion.Error(): case errIncompatibleVersion.Error():
plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.to) plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
return nil, errIncompatibleVersion return nil, errIncompatibleVersion
case errClusterIDMismatch.Error(): case errClusterIDMismatch.Error():
plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)", plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
cr.to, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID) cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
return nil, errClusterIDMismatch return nil, errClusterIDMismatch
default: default:
return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b)) return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))

View File

@ -116,9 +116,9 @@ func TestStreamReaderDialRequest(t *testing.T) {
for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} { for i, tt := range []streamType{streamTypeMessage, streamTypeMsgAppV2} {
tr := &roundTripperRecorder{} tr := &roundTripperRecorder{}
sr := &streamReader{ sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
to: types.ID(2),
} }
sr.dial(tt) sr.dial(tt)
@ -164,9 +164,9 @@ func TestStreamReaderDialResult(t *testing.T) {
err: tt.err, err: tt.err,
} }
sr := &streamReader{ sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
to: types.ID(2),
errorc: make(chan error, 1), errorc: make(chan error, 1),
} }
@ -190,9 +190,9 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
header: http.Header{}, header: http.Header{},
} }
sr := &streamReader{ sr := &streamReader{
peerID: types.ID(2),
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
to: types.ID(2),
} }
_, err := sr.dial(typ) _, err := sr.dial(typ)
@ -251,11 +251,11 @@ func TestStream(t *testing.T) {
tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)} tr := &Transport{streamRt: &http.Transport{}, ClusterID: types.ID(1)}
sr := &streamReader{ sr := &streamReader{
peerID: types.ID(2),
typ: tt.t, typ: tt.t,
tr: tr, tr: tr,
picker: picker, picker: picker,
to: types.ID(2), status: newPeerStatus(types.ID(2)),
status: newPeerStatus(types.ID(1)),
recvc: recvc, recvc: recvc,
propc: propc, propc: propc,
} }