Merge pull request #1285 from jonboulle/stats_leader_followers

etcdserver: add StoreStats, ServerStats and LeaderStats
release-2.0
Jonathan Boulle 2014-10-16 10:44:48 -07:00
commit 6a30d3ba04
8 changed files with 529 additions and 8 deletions

View File

@ -6,8 +6,12 @@ import (
"fmt"
"log"
"net/http"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
@ -103,19 +107,25 @@ func (s *clusterStore) Remove(id uint64) {
}
}
func Sender(t *http.Transport, cls ClusterStore) func(msgs []raftpb.Message) {
// Sender creates the default production sender used to transport raft messages
// in the cluster. The returned sender will update the given ServerStats and
// LeaderStats appropriately.
func Sender(t *http.Transport, cls ClusterStore, ss *stats.ServerStats, ls *stats.LeaderStats) func(msgs []raftpb.Message) {
c := &http.Client{Transport: t}
return func(msgs []raftpb.Message) {
for _, m := range msgs {
// TODO: reuse go routines
// limit the number of outgoing connections for the same receiver
go send(c, cls, m)
go send(c, cls, m, ss, ls)
}
}
}
func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
// send uses the given client to send a message to a member in the given
// ClusterStore, retrying up to 3 times for each message. The given
// ServerStats and LeaderStats are updated appropriately
func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerStats, ls *stats.LeaderStats) {
// TODO (xiangli): reasonable retry logic
for i := 0; i < 3; i++ {
u := cls.Get().Pick(m.To)
@ -126,7 +136,6 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
log.Printf("etcdhttp: no addr for %d", m.To)
return
}
u = fmt.Sprintf("%s%s", u, raftPrefix)
// TODO: don't block. we should be able to have 1000s
@ -136,13 +145,31 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message) {
log.Println("etcdhttp: dropping message:", err)
return // drop bad message
}
if httpPost(c, u, data) {
return // success
if m.Type == raftpb.MsgApp {
ss.SendAppendReq(len(data))
}
to := strconv.FormatUint(m.To, 16)
fs, ok := ls.Followers[to]
if !ok {
fs = &stats.FollowerStats{}
fs.Latency.Minimum = 1 << 63
ls.Followers[to] = fs
}
start := time.Now()
sent := httpPost(c, u, data)
end := time.Now()
if sent {
fs.Succ(end.Sub(start))
return
}
fs.Fail()
// TODO: backoff
}
}
// httpPost POSTs a data payload to a url using the given client. Returns true
// if the POST succeeds, false on any failure.
func httpPost(c *http.Client, url string, data []byte) bool {
resp, err := c.Post(url, "application/protobuf", bytes.NewBuffer(data))
if err != nil {

View File

@ -25,6 +25,7 @@ const (
deprecatedMachinesPrefix = "/v2/machines"
adminMembersPrefix = "/v2/admin/members/"
raftPrefix = "/raft"
statsPrefix = "/v2/stats"
// time to wait for response from EtcdServer requests
defaultServerTimeout = 500 * time.Millisecond
@ -40,12 +41,16 @@ func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
sh := &serverHandler{
server: server,
clusterStore: server.ClusterStore,
stats: server,
timer: server,
timeout: defaultServerTimeout,
}
mux := http.NewServeMux()
mux.HandleFunc(keysPrefix, sh.serveKeys)
mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
// TODO: dynamic configuration may make this outdated. take care of it.
// TODO: dynamic configuration may introduce race also.
// TODO: add serveMembers
@ -70,6 +75,8 @@ func NewPeerHandler(server etcdserver.Server) http.Handler {
type serverHandler struct {
timeout time.Duration
server etcdserver.Server
stats etcdserver.ServerStats
storestats etcdserver.StoreStats
timer etcdserver.RaftTimer
clusterStore etcdserver.ClusterStore
}
@ -162,6 +169,44 @@ func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request)
}
}
func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(h.storestats.JSON())
}
func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
s := h.stats.SelfStats()
b, err := json.Marshal(s)
if err != nil {
log.Printf("error marshalling stats: %v\n", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(b)
}
func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
s := h.stats.LeaderStats()
b, err := json.Marshal(s)
if err != nil {
log.Printf("error marshalling stats: %v\n", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(b)
}
func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "POST") {
return
@ -180,6 +225,10 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
return
}
log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
if m.Type == raftpb.MsgApp {
// TODO(jonboulle): standardize id uint-->string process: always base 16?
h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength))
}
if err := h.server.Process(context.TODO(), m); err != nil {
log.Println("etcdhttp: error processing raft message:", err)
writeError(w, err)

View File

@ -19,6 +19,7 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
@ -633,10 +634,131 @@ func TestServeMachines(t *testing.T) {
t.Errorf("body = %s, want %s", g, w)
}
if writer.Code != http.StatusOK {
t.Errorf("header = %d, want %d", writer.Code, http.StatusOK)
t.Errorf("code = %d, want %d", writer.Code, http.StatusOK)
}
}
type dummyServerStats struct {
ss *stats.ServerStats
ls *stats.LeaderStats
}
func (dss *dummyServerStats) SelfStats() *stats.ServerStats { return dss.ss }
func (dss *dummyServerStats) LeaderStats() *stats.LeaderStats { return dss.ls }
func TestServeSelfStats(t *testing.T) {
ss := &stats.ServerStats{
Name: "foobar",
RecvingPkgRate: 123.4,
}
w, err := json.Marshal(ss)
if err != nil {
t.Fatal("error marshaling: %v", err)
}
sh := &serverHandler{
stats: &dummyServerStats{
ss: ss,
},
}
rw := httptest.NewRecorder()
sh.serveSelfStats(rw, &http.Request{Method: "GET"})
if rw.Code != http.StatusOK {
t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
}
wct := "application/json"
if gct := rw.Header().Get("Content-Type"); gct != wct {
t.Errorf("Content-Type = %q, want %q", gct, wct)
}
if g := rw.Body.String(); g != string(w) {
t.Errorf("body = %s, want %s", g, string(w))
}
}
func TestSelfServeStatsBad(t *testing.T) {
for _, m := range []string{"PUT", "POST", "DELETE"} {
sh := &serverHandler{}
rw := httptest.NewRecorder()
sh.serveSelfStats(
rw,
&http.Request{
Method: m,
},
)
if rw.Code != http.StatusMethodNotAllowed {
t.Errorf("method %s: code=%d, want %d", m, http.StatusMethodNotAllowed)
}
}
}
func TestLeaderServeStatsBad(t *testing.T) {
for _, m := range []string{"PUT", "POST", "DELETE"} {
sh := &serverHandler{}
rw := httptest.NewRecorder()
sh.serveLeaderStats(
rw,
&http.Request{
Method: m,
},
)
if rw.Code != http.StatusMethodNotAllowed {
t.Errorf("method %s: code=%d, want %d", m, http.StatusMethodNotAllowed)
}
}
}
func TestServeLeaderStats(t *testing.T) {
ls := &stats.LeaderStats{
Leader: "foobar",
}
w, err := json.Marshal(ls)
if err != nil {
t.Fatal("error marshaling: %v", err)
}
sh := &serverHandler{
stats: &dummyServerStats{
ls: ls,
},
}
rw := httptest.NewRecorder()
sh.serveLeaderStats(rw, &http.Request{Method: "GET"})
if rw.Code != http.StatusOK {
t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
}
wct := "application/json"
if gct := rw.Header().Get("Content-Type"); gct != wct {
t.Errorf("Content-Type = %q, want %q", gct, wct)
}
if g := rw.Body.String(); g != string(w) {
t.Errorf("body = %s, want %s", g, string(w))
}
}
type dummyStoreStats struct {
data []byte
}
func (dss *dummyStoreStats) JSON() []byte { return dss.data }
func TestServeStoreStats(t *testing.T) {
w := "foobarbaz"
sh := &serverHandler{
storestats: &dummyStoreStats{data: []byte(w)},
}
rw := httptest.NewRecorder()
sh.serveStoreStats(rw, &http.Request{Method: "GET"})
if rw.Code != http.StatusOK {
t.Errorf("code = %d, want %d", rw.Code, http.StatusOK)
}
wct := "application/json"
if gct := rw.Header().Get("Content-Type"); gct != wct {
t.Errorf("Content-Type = %q, want %q", gct, wct)
}
if g := rw.Body.String(); g != w {
t.Errorf("body = %s, want %s", g, w)
}
}
func TestAllowMethod(t *testing.T) {
tests := []struct {
m string

View File

@ -6,12 +6,14 @@ import (
"log"
"math/rand"
"os"
"strconv"
"sync/atomic"
"time"
"github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
"github.com/coreos/etcd/discovery"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/pkg/pbutil"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
@ -88,6 +90,20 @@ type Server interface {
RemoveMember(ctx context.Context, id uint64) error
}
type ServerStats interface {
// SelfStats returns the statistics of this server
SelfStats() *stats.ServerStats
// LeaderStats returns the statistics of all followers in the cluster
// if this server is leader. Otherwise, nil is returned.
LeaderStats() *stats.LeaderStats
}
type StoreStats interface {
// JSON returns statistics of the underlying Store used by the
// EtcdServer, in JSON format
JSON() []byte
}
type RaftTimer interface {
Index() uint64
Term() uint64
@ -105,6 +121,9 @@ type EtcdServer struct {
node raft.Node
store store.Store
stats *stats.ServerStats
lstats *stats.LeaderStats
// send specifies the send function for sending msgs to members. send
// MUST NOT block. It is okay to drop messages, since clients should
// timeout and reissue their messages. If send is nil, server will
@ -172,6 +191,13 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
}
cls := &clusterStore{Store: st}
sstats := &stats.ServerStats{
Name: cfg.Name,
ID: strconv.FormatUint(cfg.ID(), 16),
}
lstats := stats.NewLeaderStats(strconv.FormatUint(cfg.ID(), 16))
s := &EtcdServer{
store: st,
node: n,
@ -181,7 +207,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
*wal.WAL
*snap.Snapshotter
}{w, ss},
send: Sender(cfg.Transport, cls),
stats: sstats,
lstats: lstats,
send: Sender(cfg.Transport, cls, sstats, lstats),
ticker: time.Tick(100 * time.Millisecond),
syncTicker: time.Tick(500 * time.Millisecond),
snapCount: cfg.SnapCount,
@ -208,6 +236,7 @@ func (s *EtcdServer) start() {
}
s.w = wait.New()
s.done = make(chan struct{})
s.stats.Initialize()
// TODO: if this is an empty log, writes all peer infos
// into the first entry
go s.run()
@ -334,6 +363,22 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
}
func (s *EtcdServer) SelfStats() *stats.ServerStats {
s.stats.LeaderInfo.Uptime = time.Now().Sub(s.stats.LeaderInfo.StartTime).String()
s.stats.SendingPkgRate, s.stats.SendingBandwidthRate = s.stats.SendRates()
s.stats.RecvingPkgRate, s.stats.RecvingBandwidthRate = s.stats.RecvRates()
return s.stats
}
func (s *EtcdServer) LeaderStats() *stats.LeaderStats {
// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
return s.lstats
}
func (s *EtcdServer) StoreStats() []byte {
return s.store.JsonStats()
}
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
// TODO: move Member to protobuf type
b, err := json.Marshal(memb)

View File

@ -0,0 +1,68 @@
package stats
import (
"math"
"time"
)
// LeaderStats is used by the leader in an etcd cluster, and encapsulates
// statistics about communication with its followers
type LeaderStats struct {
// TODO(jonboulle): clarify that these are IDs, not names
Leader string `json:"leader"`
Followers map[string]*FollowerStats `json:"followers"`
}
// NewLeaderStats generates a new LeaderStats with the given id as leader
func NewLeaderStats(id string) *LeaderStats {
return &LeaderStats{
Leader: id,
Followers: make(map[string]*FollowerStats),
}
}
// FollowerStats encapsulates various statistics about a follower in an etcd cluster
type FollowerStats struct {
Latency struct {
Current float64 `json:"current"`
Average float64 `json:"average"`
averageSquare float64
StandardDeviation float64 `json:"standardDeviation"`
Minimum float64 `json:"minimum"`
Maximum float64 `json:"maximum"`
} `json:"latency"`
Counts struct {
Fail uint64 `json:"fail"`
Success uint64 `json:"success"`
} `json:"counts"`
}
// Succ updates the FollowerStats with a successful send
func (fs *FollowerStats) Succ(d time.Duration) {
total := float64(fs.Counts.Success) * fs.Latency.Average
totalSquare := float64(fs.Counts.Success) * fs.Latency.averageSquare
fs.Counts.Success++
fs.Latency.Current = float64(d) / (1000000.0)
if fs.Latency.Current > fs.Latency.Maximum {
fs.Latency.Maximum = fs.Latency.Current
}
if fs.Latency.Current < fs.Latency.Minimum {
fs.Latency.Minimum = fs.Latency.Current
}
fs.Latency.Average = (total + fs.Latency.Current) / float64(fs.Counts.Success)
fs.Latency.averageSquare = (totalSquare + fs.Latency.Current*fs.Latency.Current) / float64(fs.Counts.Success)
// sdv = sqrt(avg(x^2) - avg(x)^2)
fs.Latency.StandardDeviation = math.Sqrt(fs.Latency.averageSquare - fs.Latency.Average*fs.Latency.Average)
}
// Fail updates the FollowerStats with an unsuccessful send
func (fs *FollowerStats) Fail() {
fs.Counts.Fail++
}

96
etcdserver/stats/queue.go Normal file
View File

@ -0,0 +1,96 @@
package stats
import (
"sync"
"time"
)
const (
queueCapacity = 200
)
// RequestStats represent the stats for a request.
// It encapsulates the sending time and the size of the request.
type RequestStats struct {
SendingTime time.Time
Size int
}
type statsQueue struct {
items [queueCapacity]*RequestStats
size int
front int
back int
totalReqSize int
rwl sync.RWMutex
}
func (q *statsQueue) Len() int {
return q.size
}
func (q *statsQueue) ReqSize() int {
return q.totalReqSize
}
// FrontAndBack gets the front and back elements in the queue
// We must grab front and back together with the protection of the lock
func (q *statsQueue) frontAndBack() (*RequestStats, *RequestStats) {
q.rwl.RLock()
defer q.rwl.RUnlock()
if q.size != 0 {
return q.items[q.front], q.items[q.back]
}
return nil, nil
}
// Insert function insert a RequestStats into the queue and update the records
func (q *statsQueue) Insert(p *RequestStats) {
q.rwl.Lock()
defer q.rwl.Unlock()
q.back = (q.back + 1) % queueCapacity
if q.size == queueCapacity { //dequeue
q.totalReqSize -= q.items[q.front].Size
q.front = (q.back + 1) % queueCapacity
} else {
q.size++
}
q.items[q.back] = p
q.totalReqSize += q.items[q.back].Size
}
// Rate function returns the package rate and byte rate
func (q *statsQueue) Rate() (float64, float64) {
front, back := q.frontAndBack()
if front == nil || back == nil {
return 0, 0
}
if time.Now().Sub(back.SendingTime) > time.Second {
q.Clear()
return 0, 0
}
sampleDuration := back.SendingTime.Sub(front.SendingTime)
pr := float64(q.Len()) / float64(sampleDuration) * float64(time.Second)
br := float64(q.ReqSize()) / float64(sampleDuration) * float64(time.Second)
return pr, br
}
// Clear function clear up the statsQueue
func (q *statsQueue) Clear() {
q.rwl.Lock()
defer q.rwl.Unlock()
q.back = -1
q.front = 0
q.size = 0
q.totalReqSize = 0
}

110
etcdserver/stats/server.go Normal file
View File

@ -0,0 +1,110 @@
package stats
import (
"sync"
"time"
"github.com/coreos/etcd/raft"
)
// ServerStats encapsulates various statistics about an EtcdServer and its
// communication with other members of the cluster
type ServerStats struct {
Name string `json:"name"`
// TODO(jonboulle): use ID instead of name?
ID string `json:"id"`
State raft.StateType `json:"state"`
StartTime time.Time `json:"startTime"`
LeaderInfo struct {
Name string `json:"leader"`
Uptime string `json:"uptime"`
StartTime time.Time `json:"startTime"`
} `json:"leaderInfo"`
RecvAppendRequestCnt uint64 `json:"recvAppendRequestCnt,"`
RecvingPkgRate float64 `json:"recvPkgRate,omitempty"`
RecvingBandwidthRate float64 `json:"recvBandwidthRate,omitempty"`
SendAppendRequestCnt uint64 `json:"sendAppendRequestCnt"`
SendingPkgRate float64 `json:"sendPkgRate,omitempty"`
SendingBandwidthRate float64 `json:"sendBandwidthRate,omitempty"`
sendRateQueue *statsQueue
recvRateQueue *statsQueue
sync.Mutex
}
// Initialize clears the statistics of ServerStats and resets its start time
func (ss *ServerStats) Initialize() {
if ss == nil {
return
}
now := time.Now()
ss.StartTime = now
ss.LeaderInfo.StartTime = now
ss.sendRateQueue = &statsQueue{
back: -1,
}
ss.recvRateQueue = &statsQueue{
back: -1,
}
}
// RecvRates calculates and returns the rate of received append requests
func (ss *ServerStats) RecvRates() (float64, float64) {
return ss.recvRateQueue.Rate()
}
// SendRates calculates and returns the rate of sent append requests
func (ss *ServerStats) SendRates() (float64, float64) {
return ss.sendRateQueue.Rate()
}
// RecvAppendReq updates the ServerStats in response to an AppendRequest
// from the given leader being received
func (ss *ServerStats) RecvAppendReq(leader string, reqSize int) {
ss.Lock()
defer ss.Unlock()
now := time.Now()
ss.State = raft.StateFollower
if leader != ss.LeaderInfo.Name {
ss.LeaderInfo.Name = leader
ss.LeaderInfo.StartTime = now
}
ss.recvRateQueue.Insert(
&RequestStats{
SendingTime: now,
Size: reqSize,
},
)
ss.RecvAppendRequestCnt++
}
// SendAppendReq updates the ServerStats in response to an AppendRequest
// being sent by this server
func (ss *ServerStats) SendAppendReq(reqSize int) {
ss.Lock()
defer ss.Unlock()
now := time.Now()
if ss.State != raft.StateLeader {
ss.State = raft.StateLeader
ss.LeaderInfo.Name = ss.ID
ss.LeaderInfo.StartTime = now
}
ss.sendRateQueue.Insert(
&RequestStats{
SendingTime: now,
Size: reqSize,
},
)
ss.SendAppendRequestCnt++
}

View File

@ -34,6 +34,10 @@ func (st StateType) String() string {
return stmap[uint64(st)]
}
func (st StateType) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf("%q", st.String())), nil
}
type progress struct {
match, next uint64
}