From 052838d2428391d6924e7aaef170b482401c4f31 Mon Sep 17 00:00:00 2001 From: Alexey Kostin Date: Tue, 26 Feb 2019 18:09:59 +0300 Subject: [PATCH] benchmark works --- ceph-gobench.go | 53 ++++++++++++++++++++++++++++++++++++++++++------- flags.go | 5 +++-- getosd.go | 47 ++++++++++++++++++++++--------------------- types.go | 13 ++++++++++++ 4 files changed, 86 insertions(+), 32 deletions(-) diff --git a/ceph-gobench.go b/ceph-gobench.go index 89fb6d9..57543e4 100644 --- a/ceph-gobench.go +++ b/ceph-gobench.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "math/rand" + "strconv" "sync" "time" ) @@ -30,9 +31,17 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ wg *sync.WaitGroup, result chan []string) { defer wg.Done() threadresult := make(chan string, params.threadsCount) - var osdresults []string - for i := int64(0); i < params.threadsCount; i++ { - go bench_thread(cephconn, osddevice, buffs, offset[i], params, threadresult, i) + var osdresults, objectnames []string + + // calculate object for each thread + for suffix := 0; len(objectnames) < int(params.threadsCount); suffix++ { + name := "bench_" + strconv.Itoa(suffix) + if osddevice.ID == GetObjActingPrimary(cephconn, *params, name) { + objectnames = append(objectnames, name) + } + } + for i, j := 0, 0; i < int(params.threadsCount); i, j = i+1, j+2 { + go bench_thread(cephconn, osddevice, (*buffs)[j:j+2], offset[i], params, threadresult, objectnames[i]) } for i := int64(0); i < params.threadsCount; i++ { osdresults = append(osdresults, <-threadresult) @@ -41,10 +50,39 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ result <- osdresults } -func bench_thread(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset []int64, params *Params, - result chan string, threadnum int64) { - time.Sleep(time.Second * time.Duration(1)) // prepare objects - result <- fmt.Sprintf("Host: %v Osdname: %v Threadnum: %v", osddevice.Info.Hostname, osddevice.Name, threadnum) +func bench_thread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, offsets []int64, params *Params, + result chan string, objname string) { + defer cephconn.ioctx.Delete(objname) + starttime := time.Now() + var latencies []time.Duration + endtime := starttime.Add(params.duration) + n := 0 + for { + if time.Now().After(endtime) { + break + } + for _, offset := range offsets { + if time.Now().Before(endtime) { + startwritetime := time.Now() + err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset)) + endwritetime := time.Now() + if err != nil { + log.Printf("Can't write obj: %v, osd: %v", objname, osddevice.Name) + continue + } + latencies = append(latencies, endwritetime.Sub(startwritetime)) + } else { + break + } + } + if n == 0 { + n++ + } else { + n = 0 + } + } + result <- fmt.Sprintf("Host: %v Osdname: %v Object: %v\n Latencies: %v\n Writes: %v", osddevice.Info.Hostname, + osddevice.Name, objname, latencies, len(latencies)) } func main() { @@ -84,6 +122,7 @@ func main() { if params.parallel == true { go func() { wg.Wait() + time.Sleep(time.Second) close(results) }() diff --git a/flags.go b/flags.go index fc96988..4798ec8 100644 --- a/flags.go +++ b/flags.go @@ -5,13 +5,14 @@ import ( "github.com/juju/gnuflag" "log" "strings" + "time" ) func Route() Params { params := Params{} - gnuflag.DurationVar(¶ms.duration, "duration", 30, + gnuflag.DurationVar(¶ms.duration, "duration", 30*time.Second, "Time limit for each test in seconds") - gnuflag.DurationVar(¶ms.duration, "d", 30, + gnuflag.DurationVar(¶ms.duration, "d", 30*time.Second, "Time limit for each test in seconds") gnuflag.StringVar(¶ms.bs, "blocksize", "4K", "Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G") diff --git a/getosd.go b/getosd.go index 44cb471..5f70fa3 100644 --- a/getosd.go +++ b/getosd.go @@ -67,18 +67,22 @@ func GetOsdMetadata(cephconn *Cephconnection) []OsdMetadata { return monanswer } +func GetObjActingPrimary(cephconn *Cephconnection, params Params, objname string) int64 { + monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd map", "pool": params.pool, + "object": objname, "format": "json"}) + var monanswer OsdMap + if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { + log.Fatalf("Can't parse monitor answer. Error: %v", err) + } + return monanswer.UpPrimary +} + func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { var rootbuckets []Bucket for _, bucket := range buckets { if bucket.ID == itemid { if bucket.TypeName == "host" { rootbuckets = append(rootbuckets, bucket) - for _, item := range bucket.Items { - result := GetCrushHostBuckets(buckets, item.ID) - for _, it := range result { - rootbuckets = append(rootbuckets, it) - } - } } else { for _, item := range bucket.Items { result := GetCrushHostBuckets(buckets, item.ID) @@ -92,15 +96,14 @@ func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { return rootbuckets } -func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) map[string][]Device { - var crushrule int64 +func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) []Device { + var crushrule, rootid int64 var crushrulename string for _, pool := range osddump.Pools { if pool.Pool == poolinfo.PoolId { crushrule = pool.CrushRule } } - var rootid int64 for _, rule := range osdcrushdump.Rules { if rule.RuleID == crushrule { crushrulename = rule.RuleName @@ -112,7 +115,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } } - osdhosts := make(map[string][]Device) + osddevices := []Device{} bucketitems := GetCrushHostBuckets(osdcrushdump.Buckets, rootid) if params.define != "" { if strings.HasPrefix(params.define, "osd.") { @@ -123,7 +126,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum for _, osdmetadata := range osdsmetadata { if osdmetadata.ID == device.ID { device.Info = osdmetadata - osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) + osddevices = append(osddevices, device) } } @@ -131,7 +134,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } } } - if len(osdhosts) == 0 { + if len(osddevices) == 0 { log.Fatalf("Defined osd not exist in root for rule: %v pool: %v.\nYou should define osd like osd.X", crushrulename, poolinfo.Pool) } @@ -144,7 +147,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum for _, osdmetadata := range osdsmetadata { if osdmetadata.ID == device.ID { device.Info = osdmetadata - osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) + osddevices = append(osddevices, device) } } @@ -153,7 +156,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } } } - if len(osdhosts) == 0 { + if len(osddevices) == 0 { log.Fatalf("Defined host not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool) } } @@ -168,16 +171,16 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } } - osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) + osddevices = append(osddevices, device) } } } } - if len(osdhosts) == 0 { + if len(osddevices) == 0 { log.Fatalf("Osd not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool) } } - return osdhosts + return osddevices } func ContainsPg(pgs []PlacementGroup, i int64) bool { @@ -189,7 +192,7 @@ func ContainsPg(pgs []PlacementGroup, i int64) bool { return false } -func GetOsds(cephconn *Cephconnection, params Params) map[string][]Device { +func GetOsds(cephconn *Cephconnection, params Params) []Device { poolinfo := GetPoolSize(cephconn, params) if poolinfo.Size != 1 { log.Fatalf("Pool size must be 1. Current size for pool %v is %v. Don't forget that it must be useless pool (not production). Do:\n # ceph osd pool set %v min_size 1\n # ceph osd pool set %v size 1", @@ -200,11 +203,9 @@ func GetOsds(cephconn *Cephconnection, params Params) map[string][]Device { osddump := GetOsdDump(cephconn) osdsmetadata := GetOsdMetadata(cephconn) osddevices := GetOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata) - for _, devices := range osddevices { - for _, item := range devices { - if exist := ContainsPg(placementGroups, item.ID); exist == false { - log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num") - } + for _, device := range osddevices { + if exist := ContainsPg(placementGroups, device.ID); exist == false { + log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num") } } return osddevices diff --git a/types.go b/types.go index 107c4fd..446ac8d 100644 --- a/types.go +++ b/types.go @@ -373,3 +373,16 @@ type OsdMetadata struct { OsdObjectstore string `json:"osd_objectstore"` Rotational string `json:"rotational"` } + +type OsdMap struct { + Acting []int64 `json:"acting"` + ActingPrimary int64 `json:"acting_primary"` + Epoch int64 `json:"epoch"` + Objname string `json:"objname"` + Pgid string `json:"pgid"` + Pool string `json:"pool"` + PoolID int64 `json:"pool_id"` + RawPgid string `json:"raw_pgid"` + Up []int64 `json:"up"` + UpPrimary int64 `json:"up_primary"` +}