Regex support

Osd stats summary at the end
Nautilus support
Define buckets in regexp annotation
Versioning
master
Alexey Kostin 2019-03-11 16:38:50 +03:00
parent 81253e9d36
commit 997422b348
4 changed files with 96 additions and 18 deletions

View File

@ -17,7 +17,7 @@ import (
) )
func bench(cephconn *cephconnection, osddevice Device, buff *[]byte, startbuff *[]byte, params *params, func bench(cephconn *cephconnection, osddevice Device, buff *[]byte, startbuff *[]byte, params *params,
wg *sync.WaitGroup, result chan string, totalLats chan avgLatencies, objectnames []string) { wg *sync.WaitGroup, result chan string, totalLats chan avgLatencies, osdStatsChan chan osdStatLine, objectnames []string) {
defer wg.Done() defer wg.Done()
threadresult := make(chan []time.Duration, params.threadsCount) threadresult := make(chan []time.Duration, params.threadsCount)
var osdlatencies []time.Duration var osdlatencies []time.Duration
@ -126,15 +126,22 @@ func bench(cephconn *cephconnection, osddevice Device, buff *[]byte, startbuff *
avgspeed := float64(iops) * float64(params.blocksize) / 1024 / 1024 avgspeed := float64(iops) * float64(params.blocksize) / 1024 / 1024
avgline := fmt.Sprintf("Avg iops: %-5v Avg speed: %.3f MB/s Total writes count: %-5v Total writes (MB): %-5v\n\n", avgline := fmt.Sprintf("Avg iops: %-5v Avg speed: %.3f MB/s Total writes count: %-5v Total writes (MB): %-5v\n\n",
iops, avgspeed, len(osdlatencies), uint64(len(osdlatencies))*params.blocksize/1024/1024) iops, avgspeed, len(osdlatencies), uint64(len(osdlatencies))*params.blocksize/1024/1024)
osdavgline := fmt.Sprintf("%-8v Avg iops: %-5v Avg speed: %.3f MB/s Total writes count: %-5v Total writes (MB): %-5v",
osddevice.Name, iops, avgspeed, len(osdlatencies), uint64(len(osdlatencies))*params.blocksize/1024/1024)
switch { switch {
case iops < 80: case iops < 80:
buffer.WriteString(darkred(avgline)) buffer.WriteString(darkred(avgline))
osdStatsChan <- osdStatLine{osddevice.ID, darkred(osdavgline)}
case iops < 200: case iops < 200:
buffer.WriteString(red(avgline)) buffer.WriteString(red(avgline))
osdStatsChan <- osdStatLine{osddevice.ID, red(osdavgline)}
case iops < 500: case iops < 500:
buffer.WriteString(yellow(avgline)) buffer.WriteString(yellow(avgline))
osdStatsChan <- osdStatLine{osddevice.ID, yellow(osdavgline)}
default: default:
buffer.WriteString(green(avgline)) buffer.WriteString(green(avgline))
osdStatsChan <- osdStatLine{osddevice.ID, green(osdavgline)}
} }
//sort latencies //sort latencies
@ -200,11 +207,11 @@ func main() {
if params.cpuprofile != "" { if params.cpuprofile != "" {
f, err := os.Create(params.cpuprofile) f, err := os.Create(params.cpuprofile)
if err != nil { if err != nil {
log.Fatal("could not create CPU profile: ", err) log.Fatal("Could not create CPU profile: ", err)
} }
defer f.Close() defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil { if err := pprof.StartCPUProfile(f); err != nil {
log.Fatal("could not start CPU profile: ", err) log.Fatal("Could not start CPU profile: ", err)
} }
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
@ -212,12 +219,12 @@ func main() {
if params.memprofile != "" { if params.memprofile != "" {
f, err := os.Create(params.memprofile) f, err := os.Create(params.memprofile)
if err != nil { if err != nil {
log.Fatal("could not create memory profile: ", err) log.Fatal("Could not create memory profile: ", err)
} }
defer f.Close() defer f.Close()
runtime.GC() // get up-to-date statistics runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil { if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err) log.Fatal("Could not write memory profile: ", err)
} }
} }
cephconn := connectioninit(params) cephconn := connectioninit(params)
@ -235,6 +242,8 @@ func main() {
results := make(chan string, len(osddevices)*int(params.threadsCount)) results := make(chan string, len(osddevices)*int(params.threadsCount))
totalLats := make(chan avgLatencies, len(osddevices)) totalLats := make(chan avgLatencies, len(osddevices))
avgLats := []avgLatencies{} avgLats := []avgLatencies{}
osdStatsChan := make(chan osdStatLine, len(osddevices))
osdStats := map[int64]string{}
log.Println("Calculating objects") log.Println("Calculating objects")
objectnames := map[int64][]string{} objectnames := map[int64][]string{}
@ -266,10 +275,12 @@ func main() {
for _, osd := range osddevices { for _, osd := range osddevices {
wg.Add(1) wg.Add(1)
if params.parallel == true { if params.parallel == true {
go bench(cephconn, osd, &buff, &startbuff, &params, &wg, results, totalLats, objectnames[osd.ID]) go bench(cephconn, osd, &buff, &startbuff, &params, &wg, results, totalLats, osdStatsChan, objectnames[osd.ID])
} else { } else {
bench(cephconn, osd, &buff, &startbuff, &params, &wg, results, totalLats, objectnames[osd.ID]) bench(cephconn, osd, &buff, &startbuff, &params, &wg, results, totalLats, osdStatsChan, objectnames[osd.ID])
avgLats = append(avgLats, <-totalLats) avgLats = append(avgLats, <-totalLats)
osdStat := <-osdStatsChan
osdStats[osdStat.num] = osdStat.line
log.Println(<-results) log.Println(<-results)
} }
@ -280,6 +291,7 @@ func main() {
wg.Wait() wg.Wait()
close(results) close(results)
close(totalLats) close(totalLats)
close(osdStatsChan)
}() }()
for message := range results { for message := range results {
@ -288,8 +300,22 @@ func main() {
for lat := range totalLats { for lat := range totalLats {
avgLats = append(avgLats, lat) avgLats = append(avgLats, lat)
} }
for osdStat := range osdStatsChan {
osdStats[osdStat.num] = osdStat.line
}
} }
//print sorted stats for all osd
var keys []int64
for k := range osdStats {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
for _, k := range keys {
fmt.Println(osdStats[k])
}
fmt.Println()
sumLat := int64(0) sumLat := int64(0)
countLat := int64(0) countLat := int64(0)
for _, avgLat := range avgLats { for _, avgLat := range avgLats {
@ -307,7 +333,7 @@ func main() {
color.Set(color.FgHiYellow) color.Set(color.FgHiYellow)
defer color.Unset() defer color.Unset()
fmt.Printf("Summary avg iops per osd:%5d Summary avg speed per osd: %.3f MB/s\n"+ fmt.Printf("Average iops per osd:%5d Average speed per osd: %.3f MB/s\n"+
"Total writes count:%11d Total writes (MB): %v\n", "Total writes count:%11d Total writes (MB): %v\n",
avgIops, avgSpeed, countLat, uint64(countLat)*params.blocksize/1024/1024) avgIops, avgSpeed, countLat, uint64(countLat)*params.blocksize/1024/1024)
if params.parallel { if params.parallel {

View File

@ -2,13 +2,17 @@ package main
import ( import (
"code.cloudfoundry.org/bytefmt" "code.cloudfoundry.org/bytefmt"
"fmt"
"github.com/juju/gnuflag" "github.com/juju/gnuflag"
"log" "log"
"os"
"time" "time"
) )
func route() params { func route() params {
params := params{} params := params{}
var showversion bool
const version = 1.3
gnuflag.DurationVar(&params.duration, "duration", 30*time.Second, gnuflag.DurationVar(&params.duration, "duration", 30*time.Second,
"Time limit for each test in seconds") "Time limit for each test in seconds")
gnuflag.DurationVar(&params.duration, "d", 30*time.Second, gnuflag.DurationVar(&params.duration, "d", 30*time.Second,
@ -42,7 +46,9 @@ func route() params {
gnuflag.StringVar(&params.pool, "p", "bench", gnuflag.StringVar(&params.pool, "p", "bench",
"Ceph pool") "Ceph pool")
gnuflag.StringVar(&params.define, "define", "", gnuflag.StringVar(&params.define, "define", "",
"Define specifically osd or host. osd.X or ceph-host-X") "Define specifically osd or host. Example: osd.X, ceph-host-X")
gnuflag.StringVar(&params.rdefine, "rdefine", "",
"Rdefine specifically osd or host in Posix Regex (replaces define). Example: osd.X, ceph-host-X, osd.[0-9]1?$, ceph-host-[1-2]~hdd")
gnuflag.Uint64Var(&params.threadsCount, "threads", 1, gnuflag.Uint64Var(&params.threadsCount, "threads", 1,
"Threads count") "Threads count")
gnuflag.Uint64Var(&params.threadsCount, "t", 1, gnuflag.Uint64Var(&params.threadsCount, "t", 1,
@ -53,8 +59,16 @@ func route() params {
"Name of cpuprofile") "Name of cpuprofile")
gnuflag.StringVar(&params.memprofile, "memprofile", "", gnuflag.StringVar(&params.memprofile, "memprofile", "",
"Name of memprofile") "Name of memprofile")
gnuflag.BoolVar(&showversion, "version", false, "Show sversion")
gnuflag.BoolVar(&showversion, "v", false, "Show version")
gnuflag.Parse(true) gnuflag.Parse(true)
if showversion {
fmt.Printf("go-bench version v%v\n", version)
os.Exit(0)
}
blocksize, err := bytefmt.ToBytes(params.bs) blocksize, err := bytefmt.ToBytes(params.bs)
params.blocksize = blocksize params.blocksize = blocksize
if err != nil { if err != nil {

View File

@ -3,6 +3,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"log" "log"
"regexp"
"strings" "strings"
) )
@ -35,7 +36,12 @@ func getPgByPool(cephconn *cephconnection, params params) []PlacementGroup {
"format": "json"}) "format": "json"})
var monanswer []PlacementGroup var monanswer []PlacementGroup
if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil {
log.Fatalf("Can't parse monitor answer. Error: %v", err) //try Nautilus
var nmonanswer placementGroupNautilus
if nerr := json.Unmarshal([]byte(monrawanswer), &nmonanswer); nerr != nil {
log.Fatalf("Can't parse monitor answer in getPgByPool. Error: %v", err)
}
return nmonanswer.PgStats
} }
return monanswer return monanswer
} }
@ -121,8 +127,31 @@ func getOsdForLocations(params params, osdcrushdump OsdCrushDump, osddump OsdDum
var osddevices []Device var osddevices []Device
bucketitems := getCrushHostBuckets(osdcrushdump.Buckets, rootid) bucketitems := getCrushHostBuckets(osdcrushdump.Buckets, rootid)
if params.define != "" {
if strings.HasPrefix(params.define, "osd.") { if params.rdefine != "" { // match regex if exists
validbucket, err := regexp.CompilePOSIX(params.rdefine)
if err != nil {
log.Fatalf("Can't parse regex %v", params.rdefine)
}
for _, hostbucket := range bucketitems {
for _, item := range hostbucket.Items {
for _, device := range osdcrushdump.Devices {
if device.ID == item.ID && (validbucket.MatchString(hostbucket.Name) || validbucket.MatchString(device.Name)) {
for _, osdmetadata := range osdsmetadata {
if osdmetadata.ID == device.ID && osdstats[uint64(device.ID)].Up == 1 && osdstats[uint64(device.ID)].In == 1 {
device.Info = osdmetadata
osddevices = append(osddevices, device)
}
}
}
}
}
}
if len(osddevices) == 0 {
log.Fatalf("Defined host/osd not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool)
}
} else if params.define != "" { // check defined osd/hosts
if strings.HasPrefix(params.define, "osd.") { //check that defined is osd, else host
for _, hostbucket := range bucketitems { for _, hostbucket := range bucketitems {
for _, item := range hostbucket.Items { for _, item := range hostbucket.Items {
for _, device := range osdcrushdump.Devices { for _, device := range osdcrushdump.Devices {
@ -152,7 +181,6 @@ func getOsdForLocations(params params, osdcrushdump OsdCrushDump, osddump OsdDum
device.Info = osdmetadata device.Info = osdmetadata
osddevices = append(osddevices, device) osddevices = append(osddevices, device)
} }
} }
} }
} }

View File

@ -6,11 +6,11 @@ import (
) )
type params struct { type params struct {
duration time.Duration duration time.Duration
threadsCount uint64 threadsCount uint64
blocksize, objectsize uint64 blocksize, objectsize uint64
parallel bool parallel bool
bs, os, cluster, user, keyring, config, pool, define, cpuprofile, memprofile string bs, os, cluster, user, keyring, config, pool, define, rdefine, cpuprofile, memprofile string
} }
type cephconnection struct { type cephconnection struct {
@ -391,3 +391,13 @@ type avgLatencies struct {
latencytotal int64 latencytotal int64
len int64 len int64
} }
type placementGroupNautilus struct {
PgReady bool `json:"pg_ready"`
PgStats []PlacementGroup `json:"pg_stats"`
}
type osdStatLine struct {
num int64
line string
}