diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 53670c933..d11a08822 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -349,6 +349,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { msgs []raftpb.Message urls types.URLs + term uint64 connc chan *outgoingConn } @@ -360,5 +361,6 @@ func newFakePeer() *fakePeer { func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls } +func (pr *fakePeer) setTerm(term uint64) { pr.term = term } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) Stop() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 4da82f2cc..e12792ac3 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -70,6 +70,8 @@ type Peer interface { Send(m raftpb.Message) // Update updates the urls of remote peer. Update(urls types.URLs) + // setTerm sets the term of ongoing communication. + setTerm(term uint64) // attachOutgoingConn attachs the outgoing connection to the peer for // stream usage. After the call, the ownership of the outgoing // connection hands over to the peer. The peer will close the connection @@ -99,11 +101,13 @@ type peer struct { msgAppWriter *streamWriter writer *streamWriter pipeline *pipeline + msgAppReader *streamReader sendc chan raftpb.Message recvc chan raftpb.Message propc chan raftpb.Message newURLsC chan types.URLs + termc chan uint64 // for testing pausec chan struct{} @@ -125,6 +129,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), newURLsC: make(chan types.URLs), + termc: make(chan uint64), pausec: make(chan struct{}), resumec: make(chan struct{}), stopc: make(chan struct{}), @@ -149,7 +154,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r go func() { var paused bool - msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc) + p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc) reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc) for { select { @@ -169,9 +174,6 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r m.Type, p.id, name, bufSizeMap[name]) } case mm := <-p.recvc: - if mm.Type == raftpb.MsgApp { - msgAppReader.updateMsgAppTerm(mm.Term) - } if err := r.Process(context.TODO(), mm); err != nil { log.Printf("peer: process raft message error: %v", err) } @@ -186,7 +188,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r p.msgAppWriter.stop() p.writer.stop() p.pipeline.stop() - msgAppReader.stop() + p.msgAppReader.stop() reader.stop() close(p.done) return @@ -211,6 +213,8 @@ func (p *peer) Update(urls types.URLs) { } } +func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) } + func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { diff --git a/rafthttp/transport.go b/rafthttp/transport.go index c447790fa..6a78f7145 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -77,6 +77,7 @@ type transport struct { serverStats *stats.ServerStats leaderStats *stats.LeaderStats + term uint64 // the latest term that has been observed mu sync.RWMutex // protect the remote and peer map remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map @@ -112,6 +113,16 @@ func (t *transport) Get(id types.ID) Peer { return t.peers[id] } +func (t *transport) maybeUpdatePeersTerm(term uint64) { + if t.term >= term { + return + } + t.term = term + for _, p := range t.peers { + p.setTerm(term) + } +} + func (t *transport) Send(msgs []raftpb.Message) { for _, m := range msgs { // intentionally dropped message @@ -120,6 +131,10 @@ func (t *transport) Send(msgs []raftpb.Message) { } to := types.ID(m.To) + if m.Type != raftpb.MsgProp { // proposal message does not have a valid term + t.maybeUpdatePeersTerm(m.Term) + } + p, ok := t.peers[to] if ok { if m.Type == raftpb.MsgApp {