From 19fc1a71377aa548509f8b791e317a665a9336df Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Thu, 7 May 2015 12:56:23 -0700 Subject: [PATCH] rafthttp: update streamReader term in time Because etcd 2.1 will build stream to any existing peers and etcd 2.0 requires the remote to provide most updated term, it is necessary for streamReader to know the latest term. --- rafthttp/http_test.go | 2 ++ rafthttp/peer.go | 14 +++++++++----- rafthttp/transport.go | 15 +++++++++++++++ 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 53670c933..d11a08822 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -349,6 +349,7 @@ func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } type fakePeer struct { msgs []raftpb.Message urls types.URLs + term uint64 connc chan *outgoingConn } @@ -360,5 +361,6 @@ func newFakePeer() *fakePeer { func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } func (pr *fakePeer) Update(urls types.URLs) { pr.urls = urls } +func (pr *fakePeer) setTerm(term uint64) { pr.term = term } func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } func (pr *fakePeer) Stop() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 4da82f2cc..e12792ac3 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -70,6 +70,8 @@ type Peer interface { Send(m raftpb.Message) // Update updates the urls of remote peer. Update(urls types.URLs) + // setTerm sets the term of ongoing communication. + setTerm(term uint64) // attachOutgoingConn attachs the outgoing connection to the peer for // stream usage. After the call, the ownership of the outgoing // connection hands over to the peer. The peer will close the connection @@ -99,11 +101,13 @@ type peer struct { msgAppWriter *streamWriter writer *streamWriter pipeline *pipeline + msgAppReader *streamReader sendc chan raftpb.Message recvc chan raftpb.Message propc chan raftpb.Message newURLsC chan types.URLs + termc chan uint64 // for testing pausec chan struct{} @@ -125,6 +129,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), newURLsC: make(chan types.URLs), + termc: make(chan uint64), pausec: make(chan struct{}), resumec: make(chan struct{}), stopc: make(chan struct{}), @@ -149,7 +154,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r go func() { var paused bool - msgAppReader := startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc) + p.msgAppReader = startStreamReader(tr, picker, streamTypeMsgAppV2, local, to, cid, p.recvc, p.propc, errorc) reader := startStreamReader(tr, picker, streamTypeMessage, local, to, cid, p.recvc, p.propc, errorc) for { select { @@ -169,9 +174,6 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r m.Type, p.id, name, bufSizeMap[name]) } case mm := <-p.recvc: - if mm.Type == raftpb.MsgApp { - msgAppReader.updateMsgAppTerm(mm.Term) - } if err := r.Process(context.TODO(), mm); err != nil { log.Printf("peer: process raft message error: %v", err) } @@ -186,7 +188,7 @@ func startPeer(tr http.RoundTripper, urls types.URLs, local, to, cid types.ID, r p.msgAppWriter.stop() p.writer.stop() p.pipeline.stop() - msgAppReader.stop() + p.msgAppReader.stop() reader.stop() close(p.done) return @@ -211,6 +213,8 @@ func (p *peer) Update(urls types.URLs) { } } +func (p *peer) setTerm(term uint64) { p.msgAppReader.updateMsgAppTerm(term) } + func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { diff --git a/rafthttp/transport.go b/rafthttp/transport.go index c447790fa..6a78f7145 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -77,6 +77,7 @@ type transport struct { serverStats *stats.ServerStats leaderStats *stats.LeaderStats + term uint64 // the latest term that has been observed mu sync.RWMutex // protect the remote and peer map remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up peers map[types.ID]Peer // peers map @@ -112,6 +113,16 @@ func (t *transport) Get(id types.ID) Peer { return t.peers[id] } +func (t *transport) maybeUpdatePeersTerm(term uint64) { + if t.term >= term { + return + } + t.term = term + for _, p := range t.peers { + p.setTerm(term) + } +} + func (t *transport) Send(msgs []raftpb.Message) { for _, m := range msgs { // intentionally dropped message @@ -120,6 +131,10 @@ func (t *transport) Send(msgs []raftpb.Message) { } to := types.ID(m.To) + if m.Type != raftpb.MsgProp { // proposal message does not have a valid term + t.maybeUpdatePeersTerm(m.Term) + } + p, ok := t.peers[to] if ok { if m.Type == raftpb.MsgApp {