rafthttp: resetCloser -> close

name 'close' is shorter and more straightforward.
release-2.1
Yicheng Qin 2015-05-14 22:23:40 -07:00
parent fc4543a3fd
commit 8e0992a28b
1 changed files with 10 additions and 10 deletions

View File

@ -121,7 +121,7 @@ func (cw *streamWriter) run() {
reportSentFailure(string(t), linkHeartbeatMessage)
log.Printf("rafthttp: failed to heartbeat on stream %s due to %v. waiting for a new stream to be established.", t, err)
cw.resetCloser()
cw.close()
heartbeatc, msgc = nil, nil
continue
}
@ -131,7 +131,7 @@ func (cw *streamWriter) run() {
if t == streamTypeMsgApp && m.Term != msgAppTerm {
// TODO: reasonable retry logic
if m.Term > msgAppTerm {
cw.resetCloser()
cw.close()
heartbeatc, msgc = nil, nil
// TODO: report to raft at peer level
cw.r.ReportUnreachable(m.To)
@ -143,7 +143,7 @@ func (cw *streamWriter) run() {
reportSentFailure(string(t), m)
log.Printf("rafthttp: failed to send message on stream %s due to %v. waiting for a new stream to be established.", t, err)
cw.resetCloser()
cw.close()
heartbeatc, msgc = nil, nil
cw.r.ReportUnreachable(m.To)
continue
@ -151,7 +151,7 @@ func (cw *streamWriter) run() {
flusher.Flush()
reportSentDuration(string(t), m, time.Since(start))
case conn := <-cw.connc:
cw.resetCloser()
cw.close()
t = conn.t
switch conn.t {
case streamTypeMsgApp:
@ -175,7 +175,7 @@ func (cw *streamWriter) run() {
cw.mu.Unlock()
heartbeatc, msgc = tickc, cw.msgc
case <-cw.stopc:
cw.resetCloser()
cw.close()
close(cw.done)
return
}
@ -188,7 +188,7 @@ func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
return cw.msgc, cw.working
}
func (cw *streamWriter) resetCloser() {
func (cw *streamWriter) close() {
cw.mu.Lock()
defer cw.mu.Unlock()
if !cw.working {
@ -297,7 +297,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser) error {
switch {
case err != nil:
cr.mu.Lock()
cr.resetCloser()
cr.close()
cr.mu.Unlock()
return err
case isLinkHeartbeatMessage(m):
@ -324,7 +324,7 @@ func (cr *streamReader) updateMsgAppTerm(term uint64) {
return
}
cr.msgAppTerm = term
cr.resetCloser()
cr.close()
}
// TODO: always cancel in-flight dial and decode
@ -332,7 +332,7 @@ func (cr *streamReader) stop() {
close(cr.stopc)
cr.mu.Lock()
cr.cancelRequest()
cr.resetCloser()
cr.close()
cr.mu.Unlock()
<-cr.done
}
@ -392,7 +392,7 @@ func (cr *streamReader) cancelRequest() {
}
}
func (cr *streamReader) resetCloser() {
func (cr *streamReader) close() {
if cr.closer != nil {
cr.closer.Close()
}