Merge pull request #1948 from xiang90/stats

etcdserver: fix leader stats
release-2.0
Xiang Li 2014-12-15 17:10:48 -08:00
commit 722247a752
5 changed files with 36 additions and 12 deletions

View File

@ -260,8 +260,13 @@ func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
stats := h.stats.LeaderStats()
if stats == nil {
writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(h.stats.LeaderStats())
w.Write(stats)
}
func serveVersion(w http.ResponseWriter, r *http.Request) {

View File

@ -385,6 +385,11 @@ func (s *EtcdServer) run() {
atomic.StoreUint64(&s.raftLead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader {
syncC = s.SyncTicker
// TODO: remove the nil checking
// current test utility does not provide the stats
if s.stats != nil {
s.stats.BecomeLeader()
}
} else {
syncC = nil
}
@ -526,7 +531,10 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
func (s *EtcdServer) LeaderStats() []byte {
// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
lead := atomic.LoadUint64(&s.raftLead)
if lead != uint64(s.id) {
return nil
}
return s.lstats.JSON()
}

View File

@ -141,3 +141,11 @@ func (ss *ServerStats) SendAppendReq(reqSize int) {
ss.SendAppendRequestCnt++
}
func (ss *ServerStats) BecomeLeader() {
if ss.State != raft.StateLeader {
ss.State = raft.StateLeader
ss.LeaderInfo.Name = ss.ID
ss.LeaderInfo.StartTime = time.Now()
}
}

View File

@ -77,7 +77,7 @@ func NewSender(tr http.RoundTripper, u string, id types.ID, cid types.ID, p Proc
shouldstop: shouldstop,
batcher: NewBatcher(100, appRespBatchMs*time.Millisecond),
propBatcher: NewProposalBatcher(100, propBatchMs*time.Millisecond),
q: make(chan []byte, senderBufSize),
q: make(chan *raftpb.Message, senderBufSize),
}
s.wg.Add(connPerSender)
for i := 0; i < connPerSender; i++ {
@ -98,7 +98,7 @@ type sender struct {
strmCln *streamClient
batcher *Batcher
propBatcher *ProposalBatcher
q chan []byte
q chan *raftpb.Message
strmSrvMu sync.Mutex
strmSrv *streamServer
@ -184,9 +184,8 @@ func (s *sender) Send(m raftpb.Message) error {
func (s *sender) send(m raftpb.Message) error {
// TODO: don't block. we should be able to have 1000s
// of messages out at a time.
data := pbutil.MustMarshal(&m)
select {
case s.q <- data:
case s.q <- &m:
return nil
default:
log.Printf("sender: dropping %s because maximal number %d of sender buffer entries to %s has been reached",
@ -267,9 +266,9 @@ func (s *sender) tryStream(m raftpb.Message) bool {
func (s *sender) handle() {
defer s.wg.Done()
for d := range s.q {
for m := range s.q {
start := time.Now()
err := s.post(d)
err := s.post(pbutil.MustMarshal(m))
end := time.Now()
s.mu.Lock()
@ -282,14 +281,18 @@ func (s *sender) handle() {
log.Printf("sender: the connection with %s becomes inactive", s.id)
s.active = false
}
s.fs.Fail()
if m.Type == raftpb.MsgApp {
s.fs.Fail()
}
} else {
if !s.active {
log.Printf("sender: the connection with %s becomes active", s.id)
s.active = true
s.errored = nil
}
s.fs.Succ(end.Sub(start))
if m.Type == raftpb.MsgApp {
s.fs.Succ(end.Sub(start))
}
}
s.mu.Unlock()
}

View File

@ -36,7 +36,7 @@ func TestSenderSend(t *testing.T) {
fs := &stats.FollowerStats{}
s := NewSender(tr, "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
if err := s.Send(raftpb.Message{}); err != nil {
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
t.Fatalf("unexpect send error: %v", err)
}
s.Stop()
@ -88,7 +88,7 @@ func TestSenderSendFailed(t *testing.T) {
fs := &stats.FollowerStats{}
s := NewSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), types.ID(1), &nopProcessor{}, fs, nil)
if err := s.Send(raftpb.Message{}); err != nil {
if err := s.Send(raftpb.Message{Type: raftpb.MsgApp}); err != nil {
t.Fatalf("unexpect Send error: %v", err)
}
s.Stop()