Merge pull request #2903 from xiang90/chord_rafthttp

rafhttp: clean up logging messages
release-2.1
Xiang Li 2015-06-02 14:44:40 -07:00
commit 28878e34ff
4 changed files with 15 additions and 22 deletions

View File

@ -53,15 +53,6 @@ const (
pipelineMsg = "pipeline"
)
var (
bufSizeMap = map[string]int{
streamApp: streamBufSize,
streamAppV2: streamBufSize,
streamMsg: streamBufSize,
pipelineMsg: pipelineBufSize,
}
)
type Peer interface {
// Send sends the message to the remote peer. The function is non-blocking
// and has no promise that the message will be received by the remote.
@ -170,8 +161,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
if isMsgSnap(m) {
p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
}
log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked",
m.Type, p.id, name, bufSizeMap[name])
log.Printf("peer: dropping %s to %s since %s's sending buffer is full", m.Type, p.id, name)
}
case mm := <-p.recvc:
if err := r.Process(context.TODO(), mm); err != nil {

View File

@ -39,7 +39,7 @@ func (g *remote) Send(m raftpb.Message) {
select {
case g.pipeline.msgc <- m:
default:
log.Printf("remote: dropping %s to %s since pipeline with %d-size buffer is blocked", m.Type, g.id, pipelineBufSize)
log.Printf("remote: dropping %s to %s since sending buffer is full", m.Type, g.id)
}
}

View File

@ -134,7 +134,7 @@ func (cw *streamWriter) run() {
if err := enc.encode(linkHeartbeatMessage); err != nil {
reportSentFailure(string(t), linkHeartbeatMessage)
log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
log.Printf("rafthttp: failed to heartbeat on stream %s (%v)", t, err)
cw.close()
heartbeatc, msgc = nil, nil
continue
@ -156,7 +156,7 @@ func (cw *streamWriter) run() {
if err := enc.encode(m); err != nil {
reportSentFailure(string(t), m)
log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
log.Printf("rafthttp: failed to send message on stream %s (%v)", t, err)
cw.close()
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
@ -172,7 +172,7 @@ func (cw *streamWriter) run() {
var err error
msgAppTerm, err = strconv.ParseUint(conn.termStr, 10, 64)
if err != nil {
log.Panicf("rafthttp: unexpected parse term %s error: %v", conn.termStr, err)
log.Panicf("rafthttp: could not parse term %s to uint (%v)", conn.termStr, err)
}
enc = &msgAppEncoder{w: conn.Writer, fs: cw.fs}
case streamTypeMsgAppV2:
@ -280,7 +280,7 @@ func (cr *streamReader) run() {
}
if err != nil {
if err != errUnsupportedStreamType {
log.Printf("rafthttp: roundtripping error: %v", err)
log.Printf("rafthttp: failed to dial stream %s (%v)", t, err)
}
} else {
err := cr.decodeLoop(rc, t)
@ -293,7 +293,7 @@ func (cr *streamReader) run() {
// heartbeat on the idle stream, so it is expected to time out.
case t == streamTypeMsgApp && isNetworkTimeoutError(err):
default:
log.Printf("rafthttp: failed to read message on stream %s due to %v", t, err)
log.Printf("rafthttp: failed to read message on stream %s (%v)", t, err)
}
}
select {
@ -341,7 +341,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
select {
case recvc <- m:
default:
log.Printf("rafthttp: dropping %s from %x because receive buffer is blocked",
log.Printf("rafthttp: dropping %s from %x because receiving buffer is full",
m.Type, m.From)
}
}
@ -386,10 +386,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
uu := u
uu.Path = path.Join(t.endpoint(), cr.from.String())
req, err := http.NewRequest("GET", uu.String(), nil)
if err != nil {
cr.picker.unreachable(u)
return nil, fmt.Errorf("new request to %s error: %v", u, err)
return nil, fmt.Errorf("failed to make http request to %s (%v)", u, err)
}
req.Header.Set("X-Server-From", cr.from.String())
req.Header.Set("X-Server-Version", version.Version)
@ -399,13 +400,15 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
if t == streamTypeMsgApp {
req.Header.Set("X-Raft-Term", strconv.FormatUint(term, 10))
}
cr.mu.Lock()
cr.req = req
cr.mu.Unlock()
resp, err := cr.tr.RoundTrip(req)
if err != nil {
cr.picker.unreachable(u)
return nil, fmt.Errorf("error roundtripping to %s: %v", req.URL, err)
return nil, err
}
rv := serverVersion(resp.Header)
@ -428,7 +431,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
return resp.Body, nil
case http.StatusNotFound:
resp.Body.Close()
return nil, fmt.Errorf("local member has not been added to the peer list of member %s", cr.to)
return nil, fmt.Errorf("remote member %s could not recognize local member", cr.to)
case http.StatusPreconditionFailed:
b, err := ioutil.ReadAll(resp.Body)
if err != nil {

View File

@ -150,7 +150,7 @@ func (t *transport) Send(msgs []raftpb.Message) {
continue
}
log.Printf("etcdserver: send message to unknown receiver %s", to)
log.Printf("rafthttp: ignored message %s (sent to unknown receiver %s)", m.Type, to)
}
}