From 399e3cdf819d6a7ca1cebc080d0bb1623837ef76 Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 23 Feb 2015 00:15:45 -0800 Subject: [PATCH] rafthttp: add stream http tests --- rafthttp/http.go | 20 +++-- rafthttp/http_test.go | 183 ++++++++++++++++++++++++++++++++++++++++++ rafthttp/peer.go | 20 ++++- rafthttp/transport.go | 2 +- 4 files changed, 214 insertions(+), 11 deletions(-) diff --git a/rafthttp/http.go b/rafthttp/http.go index 036abc5ca..70baedf33 100644 --- a/rafthttp/http.go +++ b/rafthttp/http.go @@ -42,11 +42,15 @@ func NewHandler(r Raft, cid types.ID) http.Handler { } } -func newStreamHandler(tr *transport, id, cid types.ID) http.Handler { +type peerGetter interface { + Get(id types.ID) Peer +} + +func newStreamHandler(peerGetter peerGetter, id, cid types.ID) http.Handler { return &streamHandler{ - tr: tr, - id: id, - cid: cid, + peerGetter: peerGetter, + id: id, + cid: cid, } } @@ -107,9 +111,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } type streamHandler struct { - tr *transport - id types.ID - cid types.ID + peerGetter peerGetter + id types.ID + cid types.ID } func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -141,7 +145,7 @@ func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { http.Error(w, "invalid from", http.StatusNotFound) return } - p := h.tr.Peer(from) + p := h.peerGetter.Get(from) if p == nil { log.Printf("rafthttp: fail to find sender %s", from) http.Error(w, "error sender not found", http.StatusNotFound) diff --git a/rafthttp/http_test.go b/rafthttp/http_test.go index 903c87a7e..72f24df64 100644 --- a/rafthttp/http_test.go +++ b/rafthttp/http_test.go @@ -22,6 +22,7 @@ import ( "net/http/httptest" "strings" "testing" + "time" "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" "github.com/coreos/etcd/pkg/pbutil" @@ -155,6 +156,165 @@ func TestServeRaftPrefix(t *testing.T) { } } +func TestServeRaftStreamPrefix(t *testing.T) { + tests := []struct { + path string + wtype streamType + }{ + { + RaftStreamPrefix + "/message/1", + streamTypeMessage, + }, + { + RaftStreamPrefix + "/msgapp/1", + streamTypeMsgApp, + }, + // backward compatibility + { + RaftStreamPrefix + "/1", + streamTypeMsgApp, + }, + } + for i, tt := range tests { + req, err := http.NewRequest("GET", "http://localhost:7001"+tt.path, nil) + if err != nil { + t.Fatalf("#%d: could not create request: %#v", i, err) + } + req.Header.Set("X-Etcd-Cluster-ID", "1") + req.Header.Set("X-Raft-To", "2") + wterm := "1" + req.Header.Set("X-Raft-Term", wterm) + + peer := newFakePeer() + peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): peer}} + h := newStreamHandler(peerGetter, types.ID(2), types.ID(1)) + + rw := httptest.NewRecorder() + go h.ServeHTTP(rw, req) + + var conn *outgoingConn + select { + case conn = <-peer.connc: + case <-time.After(time.Second): + t.Fatalf("#%d: failed to attach outgoingConn", i) + } + if conn.t != tt.wtype { + t.Errorf("$%d: type = %s, want %s", i, conn.t, tt.wtype) + } + if conn.termStr != wterm { + t.Errorf("$%d: term = %s, want %s", i, conn.termStr, wterm) + } + conn.Close() + } +} + +func TestServeRaftStreamPrefixBad(t *testing.T) { + tests := []struct { + method string + path string + clusterID string + remote string + + wcode int + }{ + // bad method + { + "PUT", + RaftStreamPrefix + "/message/1", + "1", + "1", + http.StatusMethodNotAllowed, + }, + // bad method + { + "POST", + RaftStreamPrefix + "/message/1", + "1", + "1", + http.StatusMethodNotAllowed, + }, + // bad method + { + "DELETE", + RaftStreamPrefix + "/message/1", + "1", + "1", + http.StatusMethodNotAllowed, + }, + // bad path + { + "GET", + RaftStreamPrefix + "/strange/1", + "1", + "1", + http.StatusNotFound, + }, + // bad path + { + "GET", + RaftStreamPrefix + "/strange", + "1", + "1", + http.StatusNotFound, + }, + // non-existant peer + { + "GET", + RaftStreamPrefix + "/message/2", + "1", + "1", + http.StatusNotFound, + }, + // wrong cluster ID + { + "GET", + RaftStreamPrefix + "/message/1", + "2", + "1", + http.StatusPreconditionFailed, + }, + // wrong remote id + { + "GET", + RaftStreamPrefix + "/message/1", + "1", + "2", + http.StatusPreconditionFailed, + }, + } + for i, tt := range tests { + req, err := http.NewRequest(tt.method, "http://localhost:7001"+tt.path, nil) + if err != nil { + t.Fatalf("#%d: could not create request: %#v", i, err) + } + req.Header.Set("X-Etcd-Cluster-ID", tt.clusterID) + req.Header.Set("X-Raft-To", tt.remote) + rw := httptest.NewRecorder() + peerGetter := &fakePeerGetter{peers: map[types.ID]Peer{types.ID(1): newFakePeer()}} + h := newStreamHandler(peerGetter, types.ID(1), types.ID(1)) + h.ServeHTTP(rw, req) + + if rw.Code != tt.wcode { + t.Errorf("#%d: code = %d, want %d", i, rw.Code, tt.wcode) + } + } +} + +func TestCloseNotifier(t *testing.T) { + c := newCloseNotifier() + select { + case <-c.closeNotify(): + t.Fatalf("received unexpected close notification") + default: + } + c.Close() + select { + case <-c.closeNotify(): + default: + t.Fatalf("failed to get close notification") + } +} + // errReader implements io.Reader to facilitate a broken request. type errReader struct{} @@ -180,3 +340,26 @@ type resWriterToError struct { func (e *resWriterToError) Error() string { return "" } func (e *resWriterToError) WriteTo(w http.ResponseWriter) { w.WriteHeader(e.code) } + +type fakePeerGetter struct { + peers map[types.ID]Peer +} + +func (pg *fakePeerGetter) Get(id types.ID) Peer { return pg.peers[id] } + +type fakePeer struct { + msgs []raftpb.Message + u string + connc chan *outgoingConn +} + +func newFakePeer() *fakePeer { + return &fakePeer{ + connc: make(chan *outgoingConn, 1), + } +} + +func (pr *fakePeer) Send(m raftpb.Message) { pr.msgs = append(pr.msgs, m) } +func (pr *fakePeer) Update(u string) { pr.u = u } +func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn } +func (pr *fakePeer) Stop() {} diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 4f265e029..a78503d9c 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -33,6 +33,24 @@ const ( recvBufSize = 4096 ) +type Peer interface { + // Send sends the message to the remote peer. The function is non-blocking + // and has no promise that the message will be received by the remote. + // When it fails to send message out, it will report the status to underlying + // raft. + Send(m raftpb.Message) + // Update updates the urls of remote peer. + Update(u string) + // attachOutgoingConn attachs the outgoing connection to the peer for + // stream usage. After the call, the ownership of the outgoing + // connection hands over to the peer. The peer will close the connection + // when it is no longer used. + attachOutgoingConn(conn *outgoingConn) + // Stop performs any necessary finalization and terminates the peer + // elegantly. + Stop() +} + // peer is the representative of a remote raft node. Local raft node sends // messages to the remote through peer. // Each peer has two underlying mechanisms to send out a message: stream and @@ -171,8 +189,6 @@ func (p *peer) Resume() { } } -// Stop performs any necessary finalization and terminates the peer -// elegantly. func (p *peer) Stop() { close(p.stopc) <-p.done diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 3486ac913..5d931b2c9 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -79,7 +79,7 @@ func (t *transport) Handler() http.Handler { return mux } -func (t *transport) Peer(id types.ID) *peer { +func (t *transport) Get(id types.ID) Peer { t.mu.RLock() defer t.mu.RUnlock() return t.peers[id]