etcd/rafthttp/streamer.go

324 lines
7.2 KiB
Go

/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rafthttp
import (
"errors"
"fmt"
"io"
"log"
"math"
"net/http"
"net/url"
"path"
"strconv"
"sync"
"time"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
)
const (
streamBufSize = 4096
)
// TODO: a stream might hava one stream server or one stream client, but not both.
type stream struct {
sync.Mutex
w *streamWriter
r *streamReader
stopped bool
}
func (s *stream) open(from, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) error {
rd, err := newStreamReader(from, to, cid, term, tr, u, r)
if err != nil {
log.Printf("stream: error opening stream: %v", err)
return err
}
s.Lock()
defer s.Unlock()
if s.stopped {
rd.stop()
return errors.New("stream: stopped")
}
if s.r != nil {
panic("open: stream is open")
}
s.r = rd
return nil
}
func (s *stream) attach(sw *streamWriter) error {
s.Lock()
defer s.Unlock()
if s.stopped {
return errors.New("stream: stopped")
}
if s.w != nil {
// ignore lower-term streaming request
if sw.term < s.w.term {
return fmt.Errorf("cannot attach out of data stream server [%d / %d]", sw.term, s.w.term)
}
s.w.stop()
}
s.w = sw
return nil
}
func (s *stream) write(m raftpb.Message) bool {
s.Lock()
defer s.Unlock()
if s.stopped {
return false
}
if s.w == nil {
return false
}
if m.Term != s.w.term {
if m.Term > s.w.term {
panic("expected server to be invalidated when there is a higher term message")
}
return false
}
// todo: early unlock?
if err := s.w.send(m.Entries); err != nil {
log.Printf("stream: error sending message: %v", err)
log.Printf("stream: stopping the stream server...")
s.w.stop()
s.w = nil
return false
}
return true
}
// invalidate stops the sever/client that is running at
// a term lower than the given term.
func (s *stream) invalidate(term uint64) {
s.Lock()
defer s.Unlock()
if s.w != nil {
if s.w.term < term {
s.w.stop()
s.w = nil
}
}
if s.r != nil {
if s.r.term < term {
s.r.stop()
s.r = nil
}
}
if term == math.MaxUint64 {
s.stopped = true
}
}
func (s *stream) stop() {
s.invalidate(math.MaxUint64)
}
func (s *stream) isOpen() bool {
s.Lock()
defer s.Unlock()
if s.r != nil && s.r.isStopped() {
s.r = nil
}
return s.r != nil
}
type WriteFlusher interface {
io.Writer
http.Flusher
}
// TODO: replace fs with stream stats
type streamWriter struct {
to types.ID
term uint64
fs *stats.FollowerStats
q chan []raftpb.Entry
done chan struct{}
}
// newStreamServer starts and returns a new started stream server.
// The caller should call stop when finished, to shut it down.
func newStreamWriter(w WriteFlusher, to types.ID, term uint64) *streamWriter {
s := &streamWriter{
to: to,
term: term,
q: make(chan []raftpb.Entry, streamBufSize),
done: make(chan struct{}),
}
go s.handle(w)
return s
}
func (s *streamWriter) send(ents []raftpb.Entry) error {
select {
case <-s.done:
return fmt.Errorf("stopped")
default:
}
select {
case s.q <- ents:
return nil
default:
log.Printf("rafthttp: maximum number of stream buffer entries to %d has been reached", s.to)
return fmt.Errorf("maximum number of stream buffer entries has been reached")
}
}
func (s *streamWriter) handle(w WriteFlusher) {
defer func() {
close(s.done)
log.Printf("rafthttp: server streaming to %s at term %d has been stopped", s.to, s.term)
}()
ew := &entryWriter{w: w}
for ents := range s.q {
start := time.Now()
if err := ew.writeEntries(ents); err != nil {
log.Printf("rafthttp: encountered error writing to server log stream: %v", err)
return
}
w.Flush()
s.fs.Succ(time.Since(start))
}
}
func (s *streamWriter) stop() {
close(s.q)
<-s.done
}
func (s *streamWriter) stopNotify() <-chan struct{} { return s.done }
// TODO: move the raft interface out of the reader.
type streamReader struct {
id types.ID
to types.ID
term uint64
r Raft
closer io.Closer
done chan struct{}
}
// newStreamClient starts and returns a new started stream client.
// The caller should call stop when finished, to shut it down.
func newStreamReader(id, to, cid types.ID, term uint64, tr http.RoundTripper, u string, r Raft) (*streamReader, error) {
s := &streamReader{
id: id,
to: to,
term: term,
r: r,
done: make(chan struct{}),
}
uu, err := url.Parse(u)
if err != nil {
return nil, fmt.Errorf("parse url %s error: %v", u, err)
}
uu.Path = path.Join(RaftStreamPrefix, s.id.String())
req, err := http.NewRequest("GET", uu.String(), nil)
if err != nil {
return nil, fmt.Errorf("new request to %s error: %v", u, err)
}
req.Header.Set("X-Etcd-Cluster-ID", cid.String())
req.Header.Set("X-Raft-To", s.to.String())
req.Header.Set("X-Raft-Term", strconv.FormatUint(s.term, 10))
resp, err := tr.RoundTrip(req)
if err != nil {
return nil, fmt.Errorf("error posting to %q: %v", u, err)
}
if resp.StatusCode != http.StatusOK {
resp.Body.Close()
return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
}
s.closer = resp.Body
go s.handle(resp.Body)
log.Printf("rafthttp: starting client stream to %s at term %d", s.to, s.term)
return s, nil
}
func (s *streamReader) stop() {
s.closer.Close()
<-s.done
}
func (s *streamReader) isStopped() bool {
select {
case <-s.done:
return true
default:
return false
}
}
func (s *streamReader) handle(r io.Reader) {
defer func() {
close(s.done)
log.Printf("rafthttp: client streaming to %s at term %d has been stopped", s.to, s.term)
}()
er := &entryReader{r: r}
for {
ents, err := er.readEntries()
if err != nil {
if err != io.EOF {
log.Printf("rafthttp: encountered error reading the client log stream: %v", err)
}
return
}
// Considering Commit in MsgApp is not recovered, zero-entry appendEntry
// messages have no use to raft state machine. Drop it here because
// we don't have easy way to recover its Index easily.
if len(ents) == 0 {
continue
}
// The commit index field in appendEntry message is not recovered.
// The follower updates its commit index through heartbeat.
msg := raftpb.Message{
Type: raftpb.MsgApp,
From: uint64(s.to),
To: uint64(s.id),
Term: s.term,
LogTerm: s.term,
Index: ents[0].Index - 1,
Entries: ents,
}
if err := s.r.Process(context.TODO(), msg); err != nil {
log.Printf("rafthttp: process raft message error: %v", err)
return
}
}
}
func shouldInitStream(m raftpb.Message) bool {
return m.Type == raftpb.MsgAppResp && m.Reject == false
}
func canUseStream(m raftpb.Message) bool {
return m.Type == raftpb.MsgApp && m.Index > 0 && m.Term == m.LogTerm
}