// Copyright 2015 CoreOS, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rafthttp import ( "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" ) const ( // ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates. // A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for // tcp keepalive failing to detect a bad connection, which is at minutes level. // For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage // to keep the connection alive. // For short term pipeline connections, the connection MUST be killed to avoid it being // put back to http pkg connection pool. ConnReadTimeout = 5 * time.Second ConnWriteTimeout = 5 * time.Second recvBufSize = 4096 // maxPendingProposals holds the proposals during one leader election process. // Generally one leader election takes at most 1 sec. It should have // 0-2 election conflicts, and each one takes 0.5 sec. // We assume the number of concurrent proposers is smaller than 4096. // One client blocks on its proposal for at least 1 sec, so 4096 is enough // to hold all proposals. maxPendingProposals = 4096 streamAppV2 = "streamMsgAppV2" streamMsg = "streamMsg" pipelineMsg = "pipeline" sendSnap = "sendMsgSnap" ) type Peer interface { // send sends the message to the remote peer. The function is non-blocking // and has no promise that the message will be received by the remote. // When it fails to send message out, it will report the status to underlying // raft. send(m raftpb.Message) // sendSnap sends the merged snapshot message to the remote peer. Its behavior // is similar to send. sendSnap(m snap.Message) // update updates the urls of remote peer. update(urls types.URLs) // attachOutgoingConn attaches the outgoing connection to the peer for // stream usage. After the call, the ownership of the outgoing // connection hands over to the peer. The peer will close the connection // when it is no longer used. attachOutgoingConn(conn *outgoingConn) // activeSince returns the time that the connection with the // peer becomes active. activeSince() time.Time // stop performs any necessary finalization and terminates the peer // elegantly. stop() } // peer is the representative of a remote raft node. Local raft node sends // messages to the remote through peer. // Each peer has two underlying mechanisms to send out a message: stream and // pipeline. // A stream is a receiver initialized long-polling connection, which // is always open to transfer messages. Besides general stream, peer also has // a optimized stream for sending msgApp since msgApp accounts for large part // of all messages. Only raft leader uses the optimized stream to send msgApp // to the remote follower node. // A pipeline is a series of http clients that send http requests to the remote. // It is only used when the stream has not been established. type peer struct { // id of the remote raft peer node id types.ID r Raft v3demo bool status *peerStatus picker *urlPicker msgAppV2Writer *streamWriter writer *streamWriter pipeline *pipeline snapSender *snapshotSender // snapshot sender to send v3 snapshot messages msgAppV2Reader *streamReader sendc chan raftpb.Message recvc chan raftpb.Message propc chan raftpb.Message newURLsC chan types.URLs // for testing pausec chan struct{} resumec chan struct{} stopc chan struct{} done chan struct{} } func startPeer(transport *Transport, urls types.URLs, local, to, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error, v3demo bool) *peer { status := newPeerStatus(to) picker := newURLPicker(urls) p := &peer{ id: to, r: r, v3demo: v3demo, status: status, picker: picker, msgAppV2Writer: startStreamWriter(to, status, fs, r), writer: startStreamWriter(to, status, fs, r), pipeline: newPipeline(transport, picker, local, to, cid, status, fs, r, errorc), snapSender: newSnapshotSender(transport, picker, local, to, cid, status, r, errorc), sendc: make(chan raftpb.Message), recvc: make(chan raftpb.Message, recvBufSize), propc: make(chan raftpb.Message, maxPendingProposals), newURLsC: make(chan types.URLs), pausec: make(chan struct{}), resumec: make(chan struct{}), stopc: make(chan struct{}), done: make(chan struct{}), } // Use go-routine for process of MsgProp because it is // blocking when there is no leader. ctx, cancel := context.WithCancel(context.Background()) go func() { for { select { case mm := <-p.propc: if err := r.Process(ctx, mm); err != nil { plog.Warningf("failed to process raft message (%v)", err) } case <-p.stopc: return } } }() p.msgAppV2Reader = startStreamReader(transport, picker, streamTypeMsgAppV2, local, to, cid, status, p.recvc, p.propc, errorc) reader := startStreamReader(transport, picker, streamTypeMessage, local, to, cid, status, p.recvc, p.propc, errorc) go func() { var paused bool for { select { case m := <-p.sendc: if paused { continue } writec, name := p.pick(m) select { case writec <- m: default: p.r.ReportUnreachable(m.To) if isMsgSnap(m) { p.r.ReportSnapshot(m.To, raft.SnapshotFailure) } if status.isActive() { plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name) } plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name) } case mm := <-p.recvc: if err := r.Process(context.TODO(), mm); err != nil { plog.Warningf("failed to process raft message (%v)", err) } case urls := <-p.newURLsC: picker.update(urls) case <-p.pausec: paused = true case <-p.resumec: paused = false case <-p.stopc: cancel() p.msgAppV2Writer.stop() p.writer.stop() p.pipeline.stop() p.snapSender.stop() p.msgAppV2Reader.stop() reader.stop() close(p.done) return } } }() return p } func (p *peer) send(m raftpb.Message) { select { case p.sendc <- m: case <-p.done: } } func (p *peer) sendSnap(m snap.Message) { go p.snapSender.send(m) } func (p *peer) update(urls types.URLs) { select { case p.newURLsC <- urls: case <-p.done: } } func (p *peer) attachOutgoingConn(conn *outgoingConn) { var ok bool switch conn.t { case streamTypeMsgAppV2: ok = p.msgAppV2Writer.attach(conn) case streamTypeMessage: ok = p.writer.attach(conn) default: plog.Panicf("unhandled stream type %s", conn.t) } if !ok { conn.Close() } } func (p *peer) activeSince() time.Time { return p.status.activeSince } // Pause pauses the peer. The peer will simply drops all incoming // messages without returning an error. func (p *peer) Pause() { select { case p.pausec <- struct{}{}: case <-p.done: } } // Resume resumes a paused peer. func (p *peer) Resume() { select { case p.resumec <- struct{}{}: case <-p.done: } } func (p *peer) stop() { close(p.stopc) <-p.done } // pick picks a chan for sending the given message. The picked chan and the picked chan // string name are returned. func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) { var ok bool // Considering MsgSnap may have a big size, e.g., 1G, and will block // stream for a long time, only use one of the N pipelines to send MsgSnap. if isMsgSnap(m) { return p.pipeline.msgc, pipelineMsg } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) { return writec, streamAppV2 } else if writec, ok = p.writer.writec(); ok { return writec, streamMsg } return p.pipeline.msgc, pipelineMsg } func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp } func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }