Merge pull request #2483 from yichengq/335
rafthttp: report unreachable when dropping messagesrelease-2.1
commit
62a7e2f41f
|
@ -91,6 +91,7 @@ type Peer interface {
|
|||
type peer struct {
|
||||
// id of the remote raft peer node
|
||||
id types.ID
|
||||
r Raft
|
||||
|
||||
msgAppWriter *streamWriter
|
||||
writer *streamWriter
|
||||
|
@ -113,6 +114,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
|
|||
picker := newURLPicker(urls)
|
||||
p := &peer{
|
||||
id: to,
|
||||
r: r,
|
||||
msgAppWriter: startStreamWriter(fs, r),
|
||||
writer: startStreamWriter(fs, r),
|
||||
pipeline: newPipeline(tr, picker, to, cid, fs, r, errorc),
|
||||
|
@ -156,6 +158,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r
|
|||
select {
|
||||
case writec <- m:
|
||||
default:
|
||||
p.r.ReportUnreachable(m.To)
|
||||
log.Printf("peer: dropping %s to %s since %s with %d-size buffer is blocked",
|
||||
m.Type, p.id, name, bufSizeMap[name])
|
||||
}
|
||||
|
|
|
@ -116,6 +116,8 @@ func (cw *streamWriter) run() {
|
|||
if m.Term > msgAppTerm {
|
||||
cw.resetCloser()
|
||||
heartbeatc, msgc = nil, nil
|
||||
// TODO: report to raft at peer level
|
||||
cw.r.ReportUnreachable(m.To)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue