benchmark works

master
Alexey Kostin 2019-02-26 18:09:59 +03:00
parent df824e8dc7
commit 052838d242
4 changed files with 86 additions and 32 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"log" "log"
"math/rand" "math/rand"
"strconv"
"sync" "sync"
"time" "time"
) )
@ -30,9 +31,17 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [
wg *sync.WaitGroup, result chan []string) { wg *sync.WaitGroup, result chan []string) {
defer wg.Done() defer wg.Done()
threadresult := make(chan string, params.threadsCount) threadresult := make(chan string, params.threadsCount)
var osdresults []string var osdresults, objectnames []string
for i := int64(0); i < params.threadsCount; i++ {
go bench_thread(cephconn, osddevice, buffs, offset[i], params, threadresult, i) // calculate object for each thread
for suffix := 0; len(objectnames) < int(params.threadsCount); suffix++ {
name := "bench_" + strconv.Itoa(suffix)
if osddevice.ID == GetObjActingPrimary(cephconn, *params, name) {
objectnames = append(objectnames, name)
}
}
for i, j := 0, 0; i < int(params.threadsCount); i, j = i+1, j+2 {
go bench_thread(cephconn, osddevice, (*buffs)[j:j+2], offset[i], params, threadresult, objectnames[i])
} }
for i := int64(0); i < params.threadsCount; i++ { for i := int64(0); i < params.threadsCount; i++ {
osdresults = append(osdresults, <-threadresult) osdresults = append(osdresults, <-threadresult)
@ -41,10 +50,39 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [
result <- osdresults result <- osdresults
} }
func bench_thread(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset []int64, params *Params, func bench_thread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, offsets []int64, params *Params,
result chan string, threadnum int64) { result chan string, objname string) {
time.Sleep(time.Second * time.Duration(1)) // prepare objects defer cephconn.ioctx.Delete(objname)
result <- fmt.Sprintf("Host: %v Osdname: %v Threadnum: %v", osddevice.Info.Hostname, osddevice.Name, threadnum) starttime := time.Now()
var latencies []time.Duration
endtime := starttime.Add(params.duration)
n := 0
for {
if time.Now().After(endtime) {
break
}
for _, offset := range offsets {
if time.Now().Before(endtime) {
startwritetime := time.Now()
err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset))
endwritetime := time.Now()
if err != nil {
log.Printf("Can't write obj: %v, osd: %v", objname, osddevice.Name)
continue
}
latencies = append(latencies, endwritetime.Sub(startwritetime))
} else {
break
}
}
if n == 0 {
n++
} else {
n = 0
}
}
result <- fmt.Sprintf("Host: %v Osdname: %v Object: %v\n Latencies: %v\n Writes: %v", osddevice.Info.Hostname,
osddevice.Name, objname, latencies, len(latencies))
} }
func main() { func main() {
@ -84,6 +122,7 @@ func main() {
if params.parallel == true { if params.parallel == true {
go func() { go func() {
wg.Wait() wg.Wait()
time.Sleep(time.Second)
close(results) close(results)
}() }()

View File

@ -5,13 +5,14 @@ import (
"github.com/juju/gnuflag" "github.com/juju/gnuflag"
"log" "log"
"strings" "strings"
"time"
) )
func Route() Params { func Route() Params {
params := Params{} params := Params{}
gnuflag.DurationVar(&params.duration, "duration", 30, gnuflag.DurationVar(&params.duration, "duration", 30*time.Second,
"Time limit for each test in seconds") "Time limit for each test in seconds")
gnuflag.DurationVar(&params.duration, "d", 30, gnuflag.DurationVar(&params.duration, "d", 30*time.Second,
"Time limit for each test in seconds") "Time limit for each test in seconds")
gnuflag.StringVar(&params.bs, "blocksize", "4K", gnuflag.StringVar(&params.bs, "blocksize", "4K",
"Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G") "Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G")

View File

@ -67,18 +67,22 @@ func GetOsdMetadata(cephconn *Cephconnection) []OsdMetadata {
return monanswer return monanswer
} }
func GetObjActingPrimary(cephconn *Cephconnection, params Params, objname string) int64 {
monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd map", "pool": params.pool,
"object": objname, "format": "json"})
var monanswer OsdMap
if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil {
log.Fatalf("Can't parse monitor answer. Error: %v", err)
}
return monanswer.UpPrimary
}
func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket {
var rootbuckets []Bucket var rootbuckets []Bucket
for _, bucket := range buckets { for _, bucket := range buckets {
if bucket.ID == itemid { if bucket.ID == itemid {
if bucket.TypeName == "host" { if bucket.TypeName == "host" {
rootbuckets = append(rootbuckets, bucket) rootbuckets = append(rootbuckets, bucket)
for _, item := range bucket.Items {
result := GetCrushHostBuckets(buckets, item.ID)
for _, it := range result {
rootbuckets = append(rootbuckets, it)
}
}
} else { } else {
for _, item := range bucket.Items { for _, item := range bucket.Items {
result := GetCrushHostBuckets(buckets, item.ID) result := GetCrushHostBuckets(buckets, item.ID)
@ -92,15 +96,14 @@ func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket {
return rootbuckets return rootbuckets
} }
func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) map[string][]Device { func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) []Device {
var crushrule int64 var crushrule, rootid int64
var crushrulename string var crushrulename string
for _, pool := range osddump.Pools { for _, pool := range osddump.Pools {
if pool.Pool == poolinfo.PoolId { if pool.Pool == poolinfo.PoolId {
crushrule = pool.CrushRule crushrule = pool.CrushRule
} }
} }
var rootid int64
for _, rule := range osdcrushdump.Rules { for _, rule := range osdcrushdump.Rules {
if rule.RuleID == crushrule { if rule.RuleID == crushrule {
crushrulename = rule.RuleName crushrulename = rule.RuleName
@ -112,7 +115,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
} }
} }
osdhosts := make(map[string][]Device) osddevices := []Device{}
bucketitems := GetCrushHostBuckets(osdcrushdump.Buckets, rootid) bucketitems := GetCrushHostBuckets(osdcrushdump.Buckets, rootid)
if params.define != "" { if params.define != "" {
if strings.HasPrefix(params.define, "osd.") { if strings.HasPrefix(params.define, "osd.") {
@ -123,7 +126,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
for _, osdmetadata := range osdsmetadata { for _, osdmetadata := range osdsmetadata {
if osdmetadata.ID == device.ID { if osdmetadata.ID == device.ID {
device.Info = osdmetadata device.Info = osdmetadata
osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) osddevices = append(osddevices, device)
} }
} }
@ -131,7 +134,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
} }
} }
} }
if len(osdhosts) == 0 { if len(osddevices) == 0 {
log.Fatalf("Defined osd not exist in root for rule: %v pool: %v.\nYou should define osd like osd.X", log.Fatalf("Defined osd not exist in root for rule: %v pool: %v.\nYou should define osd like osd.X",
crushrulename, poolinfo.Pool) crushrulename, poolinfo.Pool)
} }
@ -144,7 +147,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
for _, osdmetadata := range osdsmetadata { for _, osdmetadata := range osdsmetadata {
if osdmetadata.ID == device.ID { if osdmetadata.ID == device.ID {
device.Info = osdmetadata device.Info = osdmetadata
osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) osddevices = append(osddevices, device)
} }
} }
@ -153,7 +156,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
} }
} }
} }
if len(osdhosts) == 0 { if len(osddevices) == 0 {
log.Fatalf("Defined host not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool) log.Fatalf("Defined host not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool)
} }
} }
@ -168,16 +171,16 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
} }
} }
osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) osddevices = append(osddevices, device)
} }
} }
} }
} }
if len(osdhosts) == 0 { if len(osddevices) == 0 {
log.Fatalf("Osd not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool) log.Fatalf("Osd not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool)
} }
} }
return osdhosts return osddevices
} }
func ContainsPg(pgs []PlacementGroup, i int64) bool { func ContainsPg(pgs []PlacementGroup, i int64) bool {
@ -189,7 +192,7 @@ func ContainsPg(pgs []PlacementGroup, i int64) bool {
return false return false
} }
func GetOsds(cephconn *Cephconnection, params Params) map[string][]Device { func GetOsds(cephconn *Cephconnection, params Params) []Device {
poolinfo := GetPoolSize(cephconn, params) poolinfo := GetPoolSize(cephconn, params)
if poolinfo.Size != 1 { if poolinfo.Size != 1 {
log.Fatalf("Pool size must be 1. Current size for pool %v is %v. Don't forget that it must be useless pool (not production). Do:\n # ceph osd pool set %v min_size 1\n # ceph osd pool set %v size 1", log.Fatalf("Pool size must be 1. Current size for pool %v is %v. Don't forget that it must be useless pool (not production). Do:\n # ceph osd pool set %v min_size 1\n # ceph osd pool set %v size 1",
@ -200,11 +203,9 @@ func GetOsds(cephconn *Cephconnection, params Params) map[string][]Device {
osddump := GetOsdDump(cephconn) osddump := GetOsdDump(cephconn)
osdsmetadata := GetOsdMetadata(cephconn) osdsmetadata := GetOsdMetadata(cephconn)
osddevices := GetOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata) osddevices := GetOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata)
for _, devices := range osddevices { for _, device := range osddevices {
for _, item := range devices { if exist := ContainsPg(placementGroups, device.ID); exist == false {
if exist := ContainsPg(placementGroups, item.ID); exist == false { log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num")
log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num")
}
} }
} }
return osddevices return osddevices

View File

@ -373,3 +373,16 @@ type OsdMetadata struct {
OsdObjectstore string `json:"osd_objectstore"` OsdObjectstore string `json:"osd_objectstore"`
Rotational string `json:"rotational"` Rotational string `json:"rotational"`
} }
type OsdMap struct {
Acting []int64 `json:"acting"`
ActingPrimary int64 `json:"acting_primary"`
Epoch int64 `json:"epoch"`
Objname string `json:"objname"`
Pgid string `json:"pgid"`
Pool string `json:"pool"`
PoolID int64 `json:"pool_id"`
RawPgid string `json:"raw_pgid"`
Up []int64 `json:"up"`
UpPrimary int64 `json:"up_primary"`
}