Merge pull request #1152 from jonboulle/1152_raft_headers
Missing X-Raft-Term & X-Raft-Index headersrelease-2.0
commit
9c9437a9e7
|
@ -35,10 +35,11 @@ const (
|
|||
var errClosed = errors.New("etcdhttp: client closed connection")
|
||||
|
||||
// NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
|
||||
func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
|
||||
func NewClientHandler(server *etcdserver.EtcdServer, peers Peers, timeout time.Duration) http.Handler {
|
||||
sh := &serverHandler{
|
||||
server: server,
|
||||
peers: peers,
|
||||
timer: server,
|
||||
timeout: timeout,
|
||||
}
|
||||
if sh.timeout == 0 {
|
||||
|
@ -69,6 +70,7 @@ func NewPeerHandler(server etcdserver.Server) http.Handler {
|
|||
type serverHandler struct {
|
||||
timeout time.Duration
|
||||
server etcdserver.Server
|
||||
timer etcdserver.RaftTimer
|
||||
peers Peers
|
||||
}
|
||||
|
||||
|
@ -94,14 +96,14 @@ func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
switch {
|
||||
case resp.Event != nil:
|
||||
if err := writeEvent(w, resp.Event); err != nil {
|
||||
if err := writeEvent(w, resp.Event, h.timer); err != nil {
|
||||
// Should never be reached
|
||||
log.Println("error writing event: %v", err)
|
||||
log.Printf("error writing event: %v", err)
|
||||
}
|
||||
case resp.Watcher != nil:
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
|
||||
defer cancel()
|
||||
handleWatch(ctx, w, resp.Watcher, rr.Stream)
|
||||
handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
|
||||
default:
|
||||
writeError(w, errors.New("received response with no Event/Watcher!"))
|
||||
}
|
||||
|
@ -325,12 +327,14 @@ func writeError(w http.ResponseWriter, err error) {
|
|||
// writeEvent serializes a single Event and writes the resulting
|
||||
// JSON to the given ResponseWriter, along with the appropriate
|
||||
// headers
|
||||
func writeEvent(w http.ResponseWriter, ev *store.Event) error {
|
||||
func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
|
||||
if ev == nil {
|
||||
return errors.New("cannot write empty Event!")
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
|
||||
w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
|
||||
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
|
||||
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
|
||||
|
||||
if ev.IsCreated() {
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
|
@ -339,7 +343,7 @@ func writeEvent(w http.ResponseWriter, ev *store.Event) error {
|
|||
return json.NewEncoder(w).Encode(ev)
|
||||
}
|
||||
|
||||
func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
|
||||
func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
|
||||
defer wa.Remove()
|
||||
ech := wa.EventChan()
|
||||
var nch <-chan bool
|
||||
|
@ -348,6 +352,8 @@ func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, s
|
|||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
|
||||
w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
// Ensure headers are flushed early, in case of long polling
|
||||
|
|
|
@ -499,10 +499,15 @@ func TestWriteError(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
type dummyRaftTimer struct{}
|
||||
|
||||
func (drt dummyRaftTimer) Index() int64 { return int64(100) }
|
||||
func (drt dummyRaftTimer) Term() int64 { return int64(5) }
|
||||
|
||||
func TestWriteEvent(t *testing.T) {
|
||||
// nil event should not panic
|
||||
rw := httptest.NewRecorder()
|
||||
writeEvent(rw, nil)
|
||||
writeEvent(rw, nil, dummyRaftTimer{})
|
||||
h := rw.Header()
|
||||
if len(h) > 0 {
|
||||
t.Fatalf("unexpected non-empty headers: %#v", h)
|
||||
|
@ -545,10 +550,16 @@ func TestWriteEvent(t *testing.T) {
|
|||
|
||||
for i, tt := range tests {
|
||||
rw := httptest.NewRecorder()
|
||||
writeEvent(rw, tt.ev)
|
||||
writeEvent(rw, tt.ev, dummyRaftTimer{})
|
||||
if gct := rw.Header().Get("Content-Type"); gct != "application/json" {
|
||||
t.Errorf("case %d: bad Content-Type: got %q, want application/json", i, gct)
|
||||
}
|
||||
if gri := rw.Header().Get("X-Raft-Index"); gri != "100" {
|
||||
t.Errorf("case %d: bad X-Raft-Index header: got %s, want %s", i, gri, "100")
|
||||
}
|
||||
if grt := rw.Header().Get("X-Raft-Term"); grt != "5" {
|
||||
t.Errorf("case %d: bad X-Raft-Term header: got %s, want %s", i, grt, "5")
|
||||
}
|
||||
if gei := rw.Header().Get("X-Etcd-Index"); gei != tt.idx {
|
||||
t.Errorf("case %d: bad X-Etcd-Index header: got %s, want %s", i, gei, tt.idx)
|
||||
}
|
||||
|
@ -970,6 +981,7 @@ func TestServeKeysEvent(t *testing.T) {
|
|||
timeout: time.Hour,
|
||||
server: server,
|
||||
peers: nil,
|
||||
timer: &dummyRaftTimer{},
|
||||
}
|
||||
rw := httptest.NewRecorder()
|
||||
|
||||
|
@ -1008,6 +1020,7 @@ func TestServeKeysWatch(t *testing.T) {
|
|||
timeout: time.Hour,
|
||||
server: server,
|
||||
peers: nil,
|
||||
timer: &dummyRaftTimer{},
|
||||
}
|
||||
go func() {
|
||||
ec <- &store.Event{
|
||||
|
@ -1047,10 +1060,12 @@ func TestHandleWatch(t *testing.T) {
|
|||
Node: &store.NodeExtern{},
|
||||
}
|
||||
|
||||
handleWatch(context.Background(), rw, wa, false)
|
||||
handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
|
||||
|
||||
wcode := http.StatusOK
|
||||
wct := "application/json"
|
||||
wri := "100"
|
||||
wrt := "5"
|
||||
wbody := mustMarshalEvent(
|
||||
t,
|
||||
&store.Event{
|
||||
|
@ -1066,6 +1081,12 @@ func TestHandleWatch(t *testing.T) {
|
|||
if ct := h.Get("Content-Type"); ct != wct {
|
||||
t.Errorf("Content-Type=%q, want %q", ct, wct)
|
||||
}
|
||||
if ri := h.Get("X-Raft-Index"); ri != wri {
|
||||
t.Errorf("X-Raft-Index=%q, want %q", ri, wri)
|
||||
}
|
||||
if rt := h.Get("X-Raft-Term"); rt != wrt {
|
||||
t.Errorf("X-Raft-Term=%q, want %q", rt, wrt)
|
||||
}
|
||||
g := rw.Body.String()
|
||||
if g != wbody {
|
||||
t.Errorf("got body=%#v, want %#v", g, wbody)
|
||||
|
@ -1079,7 +1100,7 @@ func TestHandleWatchNoEvent(t *testing.T) {
|
|||
}
|
||||
close(wa.echan)
|
||||
|
||||
handleWatch(context.Background(), rw, wa, false)
|
||||
handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
|
||||
|
||||
wcode := http.StatusOK
|
||||
wct := "application/json"
|
||||
|
@ -1115,7 +1136,7 @@ func TestHandleWatchCloseNotified(t *testing.T) {
|
|||
rw.cn <- true
|
||||
wa := &dummyWatcher{}
|
||||
|
||||
handleWatch(context.Background(), rw, wa, false)
|
||||
handleWatch(context.Background(), rw, wa, false, dummyRaftTimer{})
|
||||
|
||||
wcode := http.StatusOK
|
||||
wct := "application/json"
|
||||
|
@ -1141,7 +1162,7 @@ func TestHandleWatchTimeout(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
|
||||
handleWatch(ctx, rw, wa, false)
|
||||
handleWatch(ctx, rw, wa, false, dummyRaftTimer{})
|
||||
|
||||
wcode := http.StatusOK
|
||||
wct := "application/json"
|
||||
|
@ -1184,7 +1205,7 @@ func TestHandleWatchStreaming(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
handleWatch(ctx, rw, wa, true)
|
||||
handleWatch(ctx, rw, wa, true, dummyRaftTimer{})
|
||||
close(done)
|
||||
}()
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"errors"
|
||||
"log"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
|
||||
|
@ -66,6 +67,11 @@ type Server interface {
|
|||
Process(ctx context.Context, m raftpb.Message) error
|
||||
}
|
||||
|
||||
type RaftTimer interface {
|
||||
Index() int64
|
||||
Term() int64
|
||||
}
|
||||
|
||||
// EtcdServer is the production implementation of the Server interface
|
||||
type EtcdServer struct {
|
||||
w wait.Wait
|
||||
|
@ -86,6 +92,10 @@ type EtcdServer struct {
|
|||
SyncTicker <-chan time.Time
|
||||
|
||||
SnapCount int64 // number of entries to trigger a snapshot
|
||||
|
||||
// Cache of the latest raft index and raft term the server has seen
|
||||
raftIndex int64
|
||||
raftTerm int64
|
||||
}
|
||||
|
||||
// Start prepares and starts server in a new goroutine. It is no longer safe to
|
||||
|
@ -138,6 +148,8 @@ func (s *EtcdServer) run() {
|
|||
default:
|
||||
panic("unexpected entry type")
|
||||
}
|
||||
atomic.StoreInt64(&s.raftIndex, e.Index)
|
||||
atomic.StoreInt64(&s.raftTerm, e.Term)
|
||||
appliedi = e.Index
|
||||
}
|
||||
|
||||
|
@ -249,6 +261,15 @@ func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
|
|||
return s.configure(ctx, cc)
|
||||
}
|
||||
|
||||
// Implement the RaftTimer interface
|
||||
func (s *EtcdServer) Index() int64 {
|
||||
return atomic.LoadInt64(&s.raftIndex)
|
||||
}
|
||||
|
||||
func (s *EtcdServer) Term() int64 {
|
||||
return atomic.LoadInt64(&s.raftTerm)
|
||||
}
|
||||
|
||||
// configure sends configuration change through consensus then performs it.
|
||||
// It will block until the change is performed or there is an error.
|
||||
func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
|
||||
|
|
Loading…
Reference in New Issue