diff --git a/server/peer_server_handlers.go b/server/peer_server_handlers.go index be665dbf5..104447f90 100644 --- a/server/peer_server_handlers.go +++ b/server/peer_server_handlers.go @@ -23,72 +23,115 @@ func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request // Response to vote request func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) { rvreq := &raft.RequestVoteRequest{} - err := decodeJsonRequest(req, rvreq) - if err == nil { - log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName) - if resp := ps.raftServer.RequestVote(rvreq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } + + if _, err := rvreq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.url, err) + return + } + + log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName) + + resp := ps.raftServer.RequestVote(rvreq) + + if resp == nil { + log.Warn("[vote] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if _, err := resp.Encode(w); err != nil { + log.Warn("[vote] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return } - log.Warnf("[vote] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) } // Response to append entries request func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) { aereq := &raft.AppendEntriesRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries)) - - ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) - - if resp := ps.raftServer.AppendEntries(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - if !resp.Success { - log.Debugf("[Append Entry] Step back") - } - return - } + if _, err := aereq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.url, err) + return + } + + log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries)) + + ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength)) + + resp := ps.raftServer.AppendEntries(aereq) + + if resp == nil { + log.Warn("[ae] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if !resp.Success { + log.Debugf("[Append Entry] Step back") + } + + if _, err := resp.Encode(w); err != nil { + log.Warn("[ae] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return } - log.Warnf("[Append Entry] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) } // Response to recover from snapshot request func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - log.Debugf("[recv] POST %s/snapshot/ ", ps.url) - if resp := ps.raftServer.RequestSnapshot(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } + ssreq := &raft.SnapshotRequest{} + + if _, err := ssreq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.url, err) + return + } + + log.Debugf("[recv] POST %s/snapshot", ps.url) + + resp := ps.raftServer.RequestSnapshot(ssreq) + + if resp == nil { + log.Warn("[ss] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if _, err := resp.Encode(w); err != nil { + log.Warn("[ss] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return } - log.Warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) } // Response to recover from snapshot request func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) { - aereq := &raft.SnapshotRecoveryRequest{} - err := decodeJsonRequest(req, aereq) - if err == nil { - log.Debugf("[recv] POST %s/snapshotRecovery/ ", ps.url) - if resp := ps.raftServer.SnapshotRecoveryRequest(aereq); resp != nil { - w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(resp) - return - } + ssrreq := &raft.SnapshotRecoveryRequest{} + + if _, err := ssrreq.Decode(req.Body); err != nil { + http.Error(w, "", http.StatusBadRequest) + log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.url, err) + return + } + + log.Debugf("[recv] POST %s/snapshotRecovery", ps.url) + + resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq) + + if resp == nil { + log.Warn("[ssr] Error: nil response") + http.Error(w, "", http.StatusInternalServerError) + return + } + + if _, err := resp.Encode(w); err != nil { + log.Warn("[ssr] Error: %v", err) + http.Error(w, "", http.StatusInternalServerError) + return } - log.Warnf("[Snapshot] ERROR: %v", err) - w.WriteHeader(http.StatusInternalServerError) } // Get the port that listening for etcd connecting of the server diff --git a/server/server.go b/server/server.go index b88d5ac0a..48757ad2b 100644 --- a/server/server.go +++ b/server/server.go @@ -116,6 +116,7 @@ func (s *Server) installV2() { s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET") s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET") s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET") + s.handleFunc("/v2/speedTest", s.SpeedTestHandler).Methods("GET") } // Adds a v1 server handler to the router. diff --git a/server/transporter.go b/server/transporter.go index 95545a827..d844dc33e 100644 --- a/server/transporter.go +++ b/server/transporter.go @@ -3,7 +3,6 @@ package server import ( "bytes" "crypto/tls" - "encoding/json" "fmt" "io" "net" @@ -65,10 +64,12 @@ func dialWithTimeout(network, addr string) (net.Conn, error) { // Sends AppendEntries RPCs to a peer when the server is the leader. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse { - var aersp *raft.AppendEntriesResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + if _, err := req.Encode(&b); err != nil { + log.Warn("transporter.ae.encoding.error:", err) + return nil + } size := b.Len() @@ -97,6 +98,7 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe if ok { thisFollowerStats.Fail() } + return nil } else { if ok { thisFollowerStats.Succ(end.Sub(start)) @@ -108,21 +110,25 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe t.CancelWhenTimeout(httpRequest) - aersp = &raft.AppendEntriesResponse{} - if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp + aeresp := &raft.AppendEntriesResponse{} + if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF { + log.Warn("transporter.ae.decoding.error:", err) + return nil } - + return aeresp } - return aersp + return nil } // Sends RequestVote RPCs to a peer when the server is the candidate. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse { - var rvrsp *raft.RequestVoteResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + + if _, err := req.Encode(&b); err != nil { + log.Warn("transporter.vr.encoding.error:", err) + return nil + } u, _ := t.peerServer.registry.PeerURL(peer.Name) log.Debugf("Send Vote from %s to %s", server.Name(), u) @@ -139,28 +145,31 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req * t.CancelWhenTimeout(httpRequest) rvrsp := &raft.RequestVoteResponse{} - if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF { - return rvrsp + if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF { + log.Warn("transporter.vr.decoding.error:", err) + return nil } - + return rvrsp } - return rvrsp + return nil } // Sends SnapshotRequest RPCs to a peer when the server is the candidate. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse { - var aersp *raft.SnapshotResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + + if _, err := req.Encode(&b); err != nil { + log.Warn("transporter.ss.encoding.error:", err) + return nil + } u, _ := t.peerServer.registry.PeerURL(peer.Name) - log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) + log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u) resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b) if err != nil { - log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err) + log.Debugf("Cannot send Snapshot Request to %s : %s", u, err) } if resp != nil { @@ -168,42 +177,48 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r t.CancelWhenTimeout(httpRequest) - aersp = &raft.SnapshotResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - - return aersp + ssrsp := &raft.SnapshotResponse{} + if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF { + log.Warn("transporter.ss.decoding.error:", err) + return nil } + return ssrsp } - - return aersp + return nil } // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse { - var aersp *raft.SnapshotRecoveryResponse var b bytes.Buffer - json.NewEncoder(&b).Encode(req) + + if _, err := req.Encode(&b); err != nil { + log.Warn("transporter.ss.encoding.error:", err) + return nil + } u, _ := t.peerServer.registry.PeerURL(peer.Name) - log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u, - req.LastTerm, req.LastIndex) + log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u) - resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) + resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b) if err != nil { - log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err) + log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err) } if resp != nil { defer resp.Body.Close() - aersp = &raft.SnapshotRecoveryResponse{} - if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF { - return aersp + t.CancelWhenTimeout(httpRequest) + + ssrrsp := &raft.SnapshotRecoveryResponse{} + if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF { + log.Warn("transporter.ssr.decoding.error:", err) + return nil } + return ssrrsp } + return nil - return aersp } // Send server side POST request