Merge pull request #7946 from heyitsanthony/report-weighted

report: add NewWeightedReport
release-3.2
Anthony Romano 2017-05-17 21:04:53 -07:00 committed by GitHub
commit 4e84bd2e3c
3 changed files with 184 additions and 62 deletions

View File

@ -30,9 +30,10 @@ const (
// Result describes the timings for an operation.
type Result struct {
Start time.Time
End time.Time
Err error
Start time.Time
End time.Time
Err error
Weight float64
}
func (res *Result) Duration() time.Duration { return res.End.Sub(res.Start) }
@ -41,18 +42,8 @@ type report struct {
results chan Result
precision string
avgTotal float64
fastest float64
slowest float64
average float64
stddev float64
rps float64
total time.Duration
errorDist map[string]int
lats []float64
sps *secondPoints
stats Stats
sps *secondPoints
}
// Stats exposes results raw data.
@ -69,6 +60,13 @@ type Stats struct {
TimeSeries TimeSeries
}
func (s *Stats) copy() Stats {
ss := *s
ss.ErrorDist = copyMap(ss.ErrorDist)
ss.Lats = copyFloats(ss.Lats)
return ss
}
// Report processes a result stream until it is closed, then produces a
// string with information about the consumed result data.
type Report interface {
@ -81,12 +79,15 @@ type Report interface {
Stats() <-chan Stats
}
func NewReport(precision string) Report {
return &report{
func NewReport(precision string) Report { return newReport(precision) }
func newReport(precision string) *report {
r := &report{
results: make(chan Result, 16),
precision: precision,
errorDist: make(map[string]int),
}
r.stats.ErrorDist = make(map[string]int)
return r
}
func NewReportSample(precision string) Report {
@ -112,22 +113,11 @@ func (r *report) Stats() <-chan Stats {
go func() {
defer close(donec)
r.processResults()
var ts TimeSeries
s := r.stats.copy()
if r.sps != nil {
ts = r.sps.getTimeSeries()
}
donec <- Stats{
AvgTotal: r.avgTotal,
Fastest: r.fastest,
Slowest: r.slowest,
Average: r.average,
Stddev: r.stddev,
RPS: r.rps,
Total: r.total,
ErrorDist: copyMap(r.errorDist),
Lats: copyFloats(r.lats),
TimeSeries: ts,
s.TimeSeries = r.sps.getTimeSeries()
}
donec <- s
}()
return donec
}
@ -147,21 +137,21 @@ func copyFloats(s []float64) (c []float64) {
}
func (r *report) String() (s string) {
if len(r.lats) > 0 {
if len(r.stats.Lats) > 0 {
s += fmt.Sprintf("\nSummary:\n")
s += fmt.Sprintf(" Total:\t%s.\n", r.sec2str(r.total.Seconds()))
s += fmt.Sprintf(" Slowest:\t%s.\n", r.sec2str(r.slowest))
s += fmt.Sprintf(" Fastest:\t%s.\n", r.sec2str(r.fastest))
s += fmt.Sprintf(" Average:\t%s.\n", r.sec2str(r.average))
s += fmt.Sprintf(" Stddev:\t%s.\n", r.sec2str(r.stddev))
s += fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps)
s += fmt.Sprintf(" Total:\t%s.\n", r.sec2str(r.stats.Total.Seconds()))
s += fmt.Sprintf(" Slowest:\t%s.\n", r.sec2str(r.stats.Slowest))
s += fmt.Sprintf(" Fastest:\t%s.\n", r.sec2str(r.stats.Fastest))
s += fmt.Sprintf(" Average:\t%s.\n", r.sec2str(r.stats.Average))
s += fmt.Sprintf(" Stddev:\t%s.\n", r.sec2str(r.stats.Stddev))
s += fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.stats.RPS)
s += r.histogram()
s += r.sprintLatencies()
if r.sps != nil {
s += fmt.Sprintf("%v\n", r.sps.getTimeSeries())
}
}
if len(r.errorDist) > 0 {
if len(r.stats.ErrorDist) > 0 {
s += r.errors()
}
return s
@ -176,17 +166,17 @@ func NewReportRate(precision string) Report {
}
func (r *reportRate) String() string {
return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.rps)
return fmt.Sprintf(" Requests/sec:\t"+r.precision+"\n", r.stats.RPS)
}
func (r *report) processResult(res *Result) {
if res.Err != nil {
r.errorDist[res.Err.Error()]++
r.stats.ErrorDist[res.Err.Error()]++
return
}
dur := res.Duration()
r.lats = append(r.lats, dur.Seconds())
r.avgTotal += dur.Seconds()
r.stats.Lats = append(r.stats.Lats, dur.Seconds())
r.stats.AvgTotal += dur.Seconds()
if r.sps != nil {
r.sps.Add(res.Start, dur)
}
@ -197,19 +187,19 @@ func (r *report) processResults() {
for res := range r.results {
r.processResult(&res)
}
r.total = time.Since(st)
r.stats.Total = time.Since(st)
r.rps = float64(len(r.lats)) / r.total.Seconds()
r.average = r.avgTotal / float64(len(r.lats))
for i := range r.lats {
dev := r.lats[i] - r.average
r.stddev += dev * dev
r.stats.RPS = float64(len(r.stats.Lats)) / r.stats.Total.Seconds()
r.stats.Average = r.stats.AvgTotal / float64(len(r.stats.Lats))
for i := range r.stats.Lats {
dev := r.stats.Lats[i] - r.stats.Average
r.stats.Stddev += dev * dev
}
r.stddev = math.Sqrt(r.stddev / float64(len(r.lats)))
sort.Float64s(r.lats)
if len(r.lats) > 0 {
r.fastest = r.lats[0]
r.slowest = r.lats[len(r.lats)-1]
r.stats.Stddev = math.Sqrt(r.stats.Stddev / float64(len(r.stats.Lats)))
sort.Float64s(r.stats.Lats)
if len(r.stats.Lats) > 0 {
r.stats.Fastest = r.stats.Lats[0]
r.stats.Slowest = r.stats.Lats[len(r.stats.Lats)-1]
}
}
@ -235,7 +225,7 @@ func percentiles(nums []float64) (data []float64) {
}
func (r *report) sprintLatencies() string {
data := percentiles(r.lats)
data := percentiles(r.stats.Lats)
s := fmt.Sprintf("\nLatency distribution:\n")
for i := 0; i < len(pctls); i++ {
if data[i] > 0 {
@ -249,15 +239,15 @@ func (r *report) histogram() string {
bc := 10
buckets := make([]float64, bc+1)
counts := make([]int, bc+1)
bs := (r.slowest - r.fastest) / float64(bc)
bs := (r.stats.Slowest - r.stats.Fastest) / float64(bc)
for i := 0; i < bc; i++ {
buckets[i] = r.fastest + bs*float64(i)
buckets[i] = r.stats.Fastest + bs*float64(i)
}
buckets[bc] = r.slowest
buckets[bc] = r.stats.Slowest
var bi int
var max int
for i := 0; i < len(r.lats); {
if r.lats[i] <= buckets[bi] {
for i := 0; i < len(r.stats.Lats); {
if r.stats.Lats[i] <= buckets[bi] {
i++
counts[bi]++
if max < counts[bi] {
@ -281,7 +271,7 @@ func (r *report) histogram() string {
func (r *report) errors() string {
s := fmt.Sprintf("\nError distribution:\n")
for err, num := range r.errorDist {
for err, num := range r.stats.ErrorDist {
s += fmt.Sprintf(" [%d]\t%s\n", num, err)
}
return s

View File

@ -81,3 +81,34 @@ func TestReport(t *testing.T) {
}
}
}
func TestWeightedReport(t *testing.T) {
r := NewWeightedReport(NewReport("%f"), "%f")
go func() {
start := time.Now()
for i := 0; i < 5; i++ {
end := start.Add(time.Second)
r.Results() <- Result{Start: start, End: end, Weight: 2.0}
start = end
}
r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
close(r.Results())
}()
stats := <-r.Stats()
stats.TimeSeries = nil // ignore timeseries since it uses wall clock
wStats := Stats{
AvgTotal: 10.0,
Fastest: 0.5,
Slowest: 0.5,
Average: 0.5,
Stddev: 0.0,
Total: stats.Total,
RPS: 10.0 / stats.Total.Seconds(),
ErrorDist: map[string]int{"oops": 1},
Lats: []float64{0.5, 0.5, 0.5, 0.5, 0.5},
}
if !reflect.DeepEqual(stats, wStats) {
t.Fatalf("got %+v, want %+v", stats, wStats)
}
}

101
pkg/report/weighted.go Normal file
View File

@ -0,0 +1,101 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// the file is borrowed from github.com/rakyll/boom/boomer/print.go
package report
import (
"time"
)
type weightedReport struct {
baseReport Report
report *report
results chan Result
weightTotal float64
}
// NewWeightedReport returns a report that includes
// both weighted and unweighted statistics.
func NewWeightedReport(r Report, precision string) Report {
return &weightedReport{
baseReport: r,
report: newReport(precision),
results: make(chan Result, 16),
}
}
func (wr *weightedReport) Results() chan<- Result { return wr.results }
func (wr *weightedReport) Run() <-chan string {
donec := make(chan string, 2)
go func() {
defer close(donec)
basec, rc := make(chan string, 1), make(chan Stats, 1)
go func() { basec <- (<-wr.baseReport.Run()) }()
go func() { rc <- (<-wr.report.Stats()) }()
go wr.processResults()
wr.report.stats = wr.reweighStat(<-rc)
donec <- wr.report.String()
donec <- (<-basec)
}()
return donec
}
func (wr *weightedReport) Stats() <-chan Stats {
donec := make(chan Stats, 2)
go func() {
defer close(donec)
basec, rc := make(chan Stats, 1), make(chan Stats, 1)
go func() { basec <- (<-wr.baseReport.Stats()) }()
go func() { rc <- (<-wr.report.Stats()) }()
go wr.processResults()
donec <- wr.reweighStat(<-rc)
donec <- (<-basec)
}()
return donec
}
func (wr *weightedReport) processResults() {
defer close(wr.report.results)
defer close(wr.baseReport.Results())
for res := range wr.results {
wr.processResult(res)
wr.baseReport.Results() <- res
}
}
func (wr *weightedReport) processResult(res Result) {
if res.Err != nil {
wr.report.results <- res
return
}
if res.Weight == 0 {
res.Weight = 1.0
}
wr.weightTotal += res.Weight
res.End = res.Start.Add(time.Duration(float64(res.End.Sub(res.Start)) / res.Weight))
res.Weight = 1.0
wr.report.results <- res
}
func (wr *weightedReport) reweighStat(s Stats) Stats {
weightCoef := wr.weightTotal / float64(len(s.Lats))
// weight > 1 => processing more than one request
s.RPS *= weightCoef
s.AvgTotal *= weightCoef * weightCoef
return s
}