Merge pull request #1319 from jonboulle/stats_race

etcdserver: fix data race in retrieving self stats
release-2.0
Jonathan Boulle 2014-10-17 09:04:17 -07:00
commit c5ba66e6aa
7 changed files with 79 additions and 82 deletions

View File

@ -6,7 +6,6 @@ import (
"fmt"
"log"
"net/http"
"strconv"
"time"
etcdErr "github.com/coreos/etcd/error"
@ -148,7 +147,7 @@ func send(c *http.Client, cls ClusterStore, m raftpb.Message, ss *stats.ServerSt
if m.Type == raftpb.MsgApp {
ss.SendAppendReq(len(data))
}
to := strconv.FormatUint(m.To, 16)
to := idAsHex(m.To)
fs := ls.Follower(to)
start := time.Now()

View File

@ -76,8 +76,7 @@ func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
type serverHandler struct {
timeout time.Duration
server etcdserver.Server
stats etcdserver.ServerStats
storestats etcdserver.StoreStats
stats etcdserver.Stats
timer etcdserver.RaftTimer
clusterStore etcdserver.ClusterStore
}
@ -175,37 +174,23 @@ func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(h.storestats.JSON())
w.Write(h.stats.StoreStats())
}
func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
s := h.stats.SelfStats()
b, err := json.Marshal(s)
if err != nil {
log.Printf("error marshalling stats: %v\n", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(b)
w.Write(h.stats.SelfStats())
}
func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
if !allowMethod(w, r.Method, "GET") {
return
}
s := h.stats.LeaderStats()
b, err := json.Marshal(s)
if err != nil {
log.Printf("error marshalling stats: %v\n", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(b)
w.Write(h.stats.LeaderStats())
}
func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
@ -227,8 +212,7 @@ func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
}
log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
if m.Type == raftpb.MsgApp {
// TODO(jonboulle): standardize id uint-->string process: always base 16?
h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength))
h.stats.UpdateRecvApp(m.From, r.ContentLength)
}
if err := h.server.Process(context.TODO(), m); err != nil {
log.Println("etcdhttp: error processing raft message:", err)

View File

@ -19,7 +19,6 @@ import (
etcdErr "github.com/coreos/etcd/error"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/etcdserver/stats"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/store"
)
@ -638,27 +637,20 @@ func TestServeMachines(t *testing.T) {
}
}
type dummyServerStats struct {
ss *stats.ServerStats
ls *stats.LeaderStats
type dummyStats struct {
data []byte
}
func (dss *dummyServerStats) SelfStats() *stats.ServerStats { return dss.ss }
func (dss *dummyServerStats) LeaderStats() *stats.LeaderStats { return dss.ls }
func (ds *dummyStats) SelfStats() []byte { return ds.data }
func (ds *dummyStats) LeaderStats() []byte { return ds.data }
func (ds *dummyStats) StoreStats() []byte { return ds.data }
func (ds *dummyStats) UpdateRecvApp(_ uint64, _ int64) {}
func TestServeSelfStats(t *testing.T) {
ss := &stats.ServerStats{
Name: "foobar",
RecvingPkgRate: 123.4,
}
w, err := json.Marshal(ss)
if err != nil {
t.Fatal("error marshaling: %v", err)
}
wb := []byte("some statistics")
w := string(wb)
sh := &serverHandler{
stats: &dummyServerStats{
ss: ss,
},
stats: &dummyStats{data: wb},
}
rw := httptest.NewRecorder()
sh.serveSelfStats(rw, &http.Request{Method: "GET"})
@ -669,8 +661,8 @@ func TestServeSelfStats(t *testing.T) {
if gct := rw.Header().Get("Content-Type"); gct != wct {
t.Errorf("Content-Type = %q, want %q", gct, wct)
}
if g := rw.Body.String(); g != string(w) {
t.Errorf("body = %s, want %s", g, string(w))
if g := rw.Body.String(); g != w {
t.Errorf("body = %s, want %s", g, w)
}
}
@ -707,17 +699,10 @@ func TestLeaderServeStatsBad(t *testing.T) {
}
func TestServeLeaderStats(t *testing.T) {
ls := &stats.LeaderStats{
Leader: "foobar",
}
w, err := json.Marshal(ls)
if err != nil {
t.Fatal("error marshaling: %v", err)
}
wb := []byte("some statistics")
w := string(wb)
sh := &serverHandler{
stats: &dummyServerStats{
ls: ls,
},
stats: &dummyStats{data: wb},
}
rw := httptest.NewRecorder()
sh.serveLeaderStats(rw, &http.Request{Method: "GET"})
@ -728,21 +713,16 @@ func TestServeLeaderStats(t *testing.T) {
if gct := rw.Header().Get("Content-Type"); gct != wct {
t.Errorf("Content-Type = %q, want %q", gct, wct)
}
if g := rw.Body.String(); g != string(w) {
t.Errorf("body = %s, want %s", g, string(w))
if g := rw.Body.String(); g != w {
t.Errorf("body = %s, want %s", g, w)
}
}
type dummyStoreStats struct {
data []byte
}
func (dss *dummyStoreStats) JSON() []byte { return dss.data }
func TestServeStoreStats(t *testing.T) {
w := "foobarbaz"
wb := []byte("some statistics")
w := string(wb)
sh := &serverHandler{
storestats: &dummyStoreStats{data: []byte(w)},
stats: &dummyStats{data: wb},
}
rw := httptest.NewRecorder()
sh.serveStoreStats(rw, &http.Request{Method: "GET"})

View File

@ -55,7 +55,7 @@ func newMember(name string, peerURLs types.URLs, now *time.Time) *Member {
}
func (m Member) storeKey() string {
return path.Join(membersKVPrefix, strconv.FormatUint(m.ID, 16))
return path.Join(membersKVPrefix, idAsHex(m.ID))
}
func parseMemberID(key string) uint64 {

View File

@ -90,18 +90,16 @@ type Server interface {
RemoveMember(ctx context.Context, id uint64) error
}
type ServerStats interface {
// SelfStats returns the statistics of this server
SelfStats() *stats.ServerStats
type Stats interface {
// SelfStats returns the struct representing statistics of this server
SelfStats() []byte
// LeaderStats returns the statistics of all followers in the cluster
// if this server is leader. Otherwise, nil is returned.
LeaderStats() *stats.LeaderStats
}
type StoreStats interface {
// JSON returns statistics of the underlying Store used by the
// EtcdServer, in JSON format
JSON() []byte
LeaderStats() []byte
// StoreStats returns statistics of the store backing this EtcdServer
StoreStats() []byte
// UpdateRecvApp updates the underlying statistics in response to a receiving an Append request
UpdateRecvApp(from uint64, length int64)
}
type RaftTimer interface {
@ -194,9 +192,9 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
sstats := &stats.ServerStats{
Name: cfg.Name,
ID: strconv.FormatUint(cfg.ID(), 16),
ID: idAsHex(cfg.ID()),
}
lstats := stats.NewLeaderStats(strconv.FormatUint(cfg.ID(), 16))
lstats := stats.NewLeaderStats(idAsHex(cfg.ID()))
s := &EtcdServer{
store: st,
@ -363,22 +361,23 @@ func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
}
}
func (s *EtcdServer) SelfStats() *stats.ServerStats {
s.stats.LeaderInfo.Uptime = time.Now().Sub(s.stats.LeaderInfo.StartTime).String()
s.stats.SendingPkgRate, s.stats.SendingBandwidthRate = s.stats.SendRates()
s.stats.RecvingPkgRate, s.stats.RecvingBandwidthRate = s.stats.RecvRates()
return s.stats
func (s *EtcdServer) SelfStats() []byte {
return s.stats.JSON()
}
func (s *EtcdServer) LeaderStats() *stats.LeaderStats {
func (s *EtcdServer) LeaderStats() []byte {
// TODO(jonboulle): need to lock access to lstats, set it to nil when not leader, ...
return s.lstats
return s.lstats.JSON()
}
func (s *EtcdServer) StoreStats() []byte {
return s.store.JsonStats()
}
func (s *EtcdServer) UpdateRecvApp(from uint64, length int64) {
s.stats.RecvAppendReq(idAsHex(from), int(length))
}
func (s *EtcdServer) AddMember(ctx context.Context, memb Member) error {
// TODO: move Member to protobuf type
b, err := json.Marshal(memb)
@ -680,3 +679,7 @@ func containsUint64(a []uint64, x uint64) bool {
}
return false
}
func idAsHex(id uint64) string {
return strconv.FormatUint(id, 16)
}

View File

@ -1,6 +1,8 @@
package stats
import (
"encoding/json"
"log"
"math"
"sync"
"time"
@ -24,6 +26,18 @@ func NewLeaderStats(id string) *LeaderStats {
}
}
func (ls *LeaderStats) JSON() []byte {
ls.Lock()
stats := *ls
ls.Unlock()
b, err := json.Marshal(stats)
// TODO(jonboulle): appropriate error handling?
if err != nil {
log.Printf("error marshalling leader stats: %v", err)
}
return b
}
func (ls *LeaderStats) Follower(name string) *FollowerStats {
ls.Lock()
defer ls.Unlock()

View File

@ -1,6 +1,8 @@
package stats
import (
"encoding/json"
"log"
"sync"
"time"
@ -36,6 +38,21 @@ type ServerStats struct {
sync.Mutex
}
func (ss *ServerStats) JSON() []byte {
ss.Lock()
stats := *ss
ss.Unlock()
stats.LeaderInfo.Uptime = time.Now().Sub(stats.LeaderInfo.StartTime).String()
stats.SendingPkgRate, stats.SendingBandwidthRate = stats.SendRates()
stats.RecvingPkgRate, stats.RecvingBandwidthRate = stats.RecvRates()
b, err := json.Marshal(stats)
// TODO(jonboulle): appropriate error handling?
if err != nil {
log.Printf("error marshalling server stats: %v", err)
}
return b
}
// Initialize clears the statistics of ServerStats and resets its start time
func (ss *ServerStats) Initialize() {
if ss == nil {