diff --git a/rafthttp/http.go b/rafthttp/http.go index 213107129..1a65028e1 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -159,14 +159,14 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.(http.Flusher).Flush() - stream := newStreamServer(w.(WriteFlusher), from, term) - err = p.attachStream(stream) + sw := newStreamWriter(w.(WriteFlusher), from, term) + err = p.attachStream(sw) if err != nil { log.Printf("rafthttp: %v", err) http.Error(w, err.Error(), http.StatusBadRequest) return } - <-stream.stopNotify() + <-sw.stopNotify() } type writerToResponse interface { diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 6f0c8a829..4a648da7f 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -18,6 +18,7 @@ package rafthttp import ( "bytes" + "errors" "fmt" "log" "net/http" @@ -46,6 +47,8 @@ const ( ) type peer struct { + sync.Mutex + id types.ID cid types.ID @@ -63,13 +66,14 @@ type peer struct { // wait for the handling routines wg sync.WaitGroup - mu sync.RWMutex - u string // the url this sender post to - // if the last send was successful, thi sender is active. + // the url this sender post to + u string + // if the last send was successful, the sender is active. // Or it is inactive active bool errored error paused bool + stopped bool } func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, fs *stats.FollowerStats, errorc chan error) *peer { @@ -95,8 +99,12 @@ func NewPeer(tr http.RoundTripper, u string, id types.ID, cid types.ID, r Raft, } func (p *peer) Update(u string) { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() + if p.stopped { + // TODO: not panic here? + panic("peer: update a stopped peer") + } p.u = u } @@ -104,19 +112,20 @@ func (p *peer) Update(u string) { // It may be fail to send data if it returns nil error. // TODO (xiangli): reasonable retry logic func (p *peer) Send(m raftpb.Message) error { - p.mu.RLock() - pause := p.paused - p.mu.RUnlock() - if pause { + p.Lock() + defer p.Unlock() + if p.stopped { + return errors.New("peer: stopped") + } + if p.paused { return nil } // move all the stream related stuff into stream p.stream.invalidate(m.Term) if shouldInitStream(m) && !p.stream.isOpen() { - p.mu.Lock() u := p.u - p.mu.Unlock() + // todo: steam open should not block. p.stream.open(types.ID(m.From), p.id, p.cid, m.Term, p.tr, u, p.r) p.batcher.Reset(time.Now()) } @@ -168,7 +177,11 @@ func (p *peer) send(m raftpb.Message) error { func (p *peer) Stop() { close(p.q) p.wg.Wait() + + p.Lock() + defer p.Unlock() p.stream.stop() + p.stopped = true } func (p *peer) handle() { @@ -178,7 +191,7 @@ func (p *peer) handle() { err := p.post(pbutil.MustMarshal(m)) end := time.Now() - p.mu.Lock() + p.Lock() if err != nil { if p.errored == nil || p.errored.Error() != err.Error() { log.Printf("sender: error posting to %s: %v", p.id, err) @@ -201,16 +214,16 @@ func (p *peer) handle() { p.fs.Succ(end.Sub(start)) } } - p.mu.Unlock() + p.Unlock() } } // post POSTs a data payload to a url. Returns nil if the POST succeeds, // error on any failure. func (p *peer) post(data []byte) error { - p.mu.RLock() + p.Lock() req, err := http.NewRequest("POST", p.u, bytes.NewBuffer(data)) - p.mu.RUnlock() + p.Unlock() if err != nil { return err } @@ -245,23 +258,29 @@ func (p *peer) post(data []byte) error { } // attachStream attaches a streamSever to the peer. -func (p *peer) attachStream(server *streamServer) error { - server.fs = p.fs - return p.stream.attach(server) +func (p *peer) attachStream(sw *streamWriter) error { + p.Lock() + defer p.Unlock() + if p.stopped { + return errors.New("peer: stopped") + } + + sw.fs = p.fs + return p.stream.attach(sw) } // Pause pauses the peer. The peer will simply drops all incoming // messages without retruning an error. func (p *peer) Pause() { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() p.paused = true } // Resume resumes a paused peer. func (p *peer) Resume() { - p.mu.Lock() - defer p.mu.Unlock() + p.Lock() + defer p.Unlock() p.paused = false } diff --git a/rafthttp/streamer.go b/rafthttp/streamer.go index 24866eb95..942e94521 100644 --- a/rafthttp/streamer.go +++ b/rafthttp/streamer.go @@ -17,6 +17,7 @@ package rafthttp import ( + "errors" "fmt" "io" "log" @@ -41,60 +42,70 @@ const ( // TODO: a stream might hava one stream server or one stream client, but not both. type stream struct { - // the server might be attached asynchronously with the owner of the stream - // use a mutex to protect it sync.Mutex - server *streamServer - - client *streamClient + w *streamWriter + r *streamReader + stopped bool } func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error { - if s.client != nil { - panic("open: stream is open") - } - - c, err := newStreamClient(from, to, cid, term, tr, u, r) + rd, err := newStreamReader(from, to, cid, term, tr, u, r) if err != nil { log.Printf("stream: error opening stream: %v", err) return err } - s.client = c + + s.Lock() + defer s.Unlock() + if s.stopped { + rd.stop() + return errors.New("stream: stopped") + } + if s.r != nil { + panic("open: stream is open") + } + s.r = rd return nil } -func (s *stream) attach(server *streamServer) error { +func (s *stream) attach(sw *streamWriter) error { s.Lock() defer s.Unlock() - if s.server != nil { - // ignore lower-term streaming request - if server.term < s.server.term { - return fmt.Errorf("cannot attach out of data stream server [%d / %d]", server.term, s.server.term) - } - s.server.stop() + if s.stopped { + return errors.New("stream: stopped") } - s.server = server + if s.w != nil { + // ignore lower-term streaming request + if sw.term < s.w.term { + return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term) + } + s.w.stop() + } + s.w = sw return nil } func (s *stream) write(m raftpb.Message) bool { s.Lock() defer s.Unlock() - if s.server == nil { + if s.stopped { return false } - if m.Term != s.server.term { - if m.Term > s.server.term { + if s.w == nil { + return false + } + if m.Term != s.w.term { + if m.Term > s.w.term { panic("expected server to be invalidated when there is a higher term message") } return false } // todo: early unlock? - if err := s.server.send(m.Entries); err != nil { + if err := s.w.send(m.Entries); err != nil { log.Printf("stream: error sending message: %v", err) log.Printf("stream: stopping the stream server...") - s.server.stop() - s.server = nil + s.w.stop() + s.w = nil return false } return true @@ -105,19 +116,21 @@ func (s *stream) write(m raftpb.Message) bool { func (s *stream) invalidate(term uint64) { s.Lock() defer s.Unlock() - - if s.server != nil { - if s.server.term < term { - s.server.stop() - s.server = nil + if s.w != nil { + if s.w.term < term { + s.w.stop() + s.w = nil } } - if s.client != nil { - if s.client.term < term { - s.client.stop() - s.client = nil + if s.r != nil { + if s.r.term < term { + s.r.stop() + s.r = nil } } + if term == math.MaxUint64 { + s.stopped = true + } } func (s *stream) stop() { @@ -125,10 +138,12 @@ func (s *stream) stop() { } func (s *stream) isOpen() bool { - if s.client != nil && s.client.isStopped() { - s.client = nil + s.Lock() + defer s.Unlock() + if s.r != nil && s.r.isStopped() { + s.r = nil } - return s.client != nil + return s.r != nil } type WriteFlusher interface { @@ -136,9 +151,8 @@ type WriteFlusher interface { http.Flusher } -// TODO: rename it to streamWriter. // TODO: replace fs with stream stats -type streamServer struct { +type streamWriter struct { to types.ID term uint64 fs *stats.FollowerStats @@ -148,8 +162,8 @@ type streamServer struct { // newStreamServer starts and returns a new started stream server. // The caller should call stop when finished, to shut it down. -func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer { - s := &streamServer{ +func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter { + s := &streamWriter{ to: to, term: term, q: make(chan []raftpb.Entry, streamBufSize), @@ -159,7 +173,7 @@ func newStreamServer(w WriteFlusher, to types.ID, term uint64) *streamServer { return s } -func (s *streamServer) send(ents []raftpb.Entry) error { +func (s *streamWriter) send(ents []raftpb.Entry) error { select { case <-s.done: return fmt.Errorf("stopped") @@ -174,7 +188,7 @@ func (s *streamServer) send(ents []raftpb.Entry) error { } } -func (s *streamServer) handle(w WriteFlusher) { +func (s *streamWriter) handle(w WriteFlusher) { defer func() { close(s.done) log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term) @@ -192,16 +206,15 @@ func (s *streamServer) handle(w WriteFlusher) { } } -func (s *streamServer) stop() { +func (s *streamWriter) stop() { close(s.q) <-s.done } -func (s *streamServer) stopNotify() <-chan struct{} { return s.done } +func (s *streamWriter) stopNotify() <-chan struct{} { return s.done } -// TODO: rename it to streamReader. // TODO: move the raft interface out of the reader. -type streamClient struct { +type streamReader struct { id types.ID to types.ID term uint64 @@ -213,8 +226,8 @@ type streamClient struct { // newStreamClient starts and returns a new started stream client. // The caller should call stop when finished, to shut it down. -func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamClient, error) { - s := &streamClient{ +func newStreamReader(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamReader, error) { + s := &streamReader{ id: id, to: to, term: term, @@ -248,12 +261,12 @@ func newStreamClient(id, to, cid types.ID, term uint64, tr http.RoundTripper, u return s, nil } -func (s *streamClient) stop() { +func (s *streamReader) stop() { s.closer.Close() <-s.done } -func (s *streamClient) isStopped() bool { +func (s *streamReader) isStopped() bool { select { case <-s.done: return true @@ -262,7 +275,7 @@ func (s *streamClient) isStopped() bool { } } -func (s *streamClient) handle(r io.Reader) { +func (s *streamReader) handle(r io.Reader) { defer func() { close(s.done) log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)