diff --git a/ceph-gobench.go b/ceph-gobench.go index 45df3ec..aab4d92 100644 --- a/ceph-gobench.go +++ b/ceph-gobench.go @@ -13,25 +13,7 @@ import ( "time" ) -//future feature -func makeoffsets(threads int64, bs int64, objsize int64) [][]int64 { - var offsets [][]int64 - for i := int64(0); i < threads; i++ { - s1 := rand.NewSource(i) - r1 := rand.New(s1) - localoffsets := make([]int64, 0, objsize-bs) - for i := int64(0); i < objsize-bs; i += bs { - localoffsets = append(localoffsets, i) - } - r1.Shuffle(len(localoffsets), func(i, j int) { - localoffsets[i], localoffsets[j] = localoffsets[j], localoffsets[i] - }) - offsets = append(offsets, localoffsets) - } - return offsets -} - -func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [][]int64, params *Params, +func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, bs int64, objsize int64, params *Params, wg *sync.WaitGroup, result chan string) { defer wg.Done() threadresult := make(chan []time.Duration, params.threadsCount) @@ -43,14 +25,14 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ } }() // calculate object for each thread - for suffix := 0; len(objectnames) < int(params.threadsCount); suffix++ { + for suffix := 0; len(objectnames) < int(params.threadsCount)*16; suffix++ { name := "bench_" + strconv.Itoa(suffix) if osddevice.ID == GetObjActingPrimary(cephconn, *params, name) { objectnames = append(objectnames, name) } } - for i, j := 0, 0; i < int(params.threadsCount); i, j = i+1, j+2 { - go bench_thread(cephconn, osddevice, (*buffs)[j:j+2], offset[i], params, threadresult, objectnames[i]) + for i := 0; i < int(params.threadsCount); i++ { + go bench_thread(cephconn, osddevice, (*buffs)[i*2:i*2+2], bs, objsize, params, threadresult, objectnames[i*16:i*16+16]) } for i := int64(0); i < params.threadsCount; i++ { for _, lat := range <-threadresult { @@ -58,16 +40,25 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ } } close(threadresult) - latencygrade := map[int]int{} + latencygrade := map[int64]int{} + latencytotal := int64(0) for _, lat := range osdlatencies { + micro := lat.Nanoseconds()/1000 + rounded := micro switch { - case lat < time.Millisecond*10: - latencygrade[int(lat.Round(time.Millisecond).Nanoseconds()/1000000)]++ - case lat < time.Millisecond*100: - latencygrade[int(lat.Round(time.Millisecond*10)/1000000)]++ - default: - latencygrade[int(lat.Round(time.Millisecond*100)/1000000)]++ + case micro < 1000: // 0-1ms round to 0.1ms + rounded = (micro/100)*100 + case micro < 10000: // 2-10ms round to 1ms + rounded = (micro/1000)*1000 + case micro < 100000: // 10-100ms round to 10ms + rounded = (micro/10000)*10000 + case micro < 1000000: // 100-1000ms round to 100ms + rounded = (micro/100000)*100000 + default: // 1000+ms round to 1s + rounded = (micro/1000000)*1000000 } + latencytotal += micro + latencygrade[rounded]++ } var buffer bytes.Buffer @@ -100,62 +91,77 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ } } + latencytotal = latencytotal/int64(len(osdlatencies)) + // iops = 1s / latency + iops := 1000000 / latencytotal + // avg speed = iops * block size / 1 MB + avgspeed := (1000000 / float64(latencytotal) * float64(params.blocksize) / 1024 / 1024) + avgline := fmt.Sprintf("Avg iops: %-5v Avg speed: %.3f MB/s\n\n", iops, avgspeed) + switch { + case iops < 80: + buffer.WriteString(darkred(avgline)) + case iops < 200: + buffer.WriteString(red(avgline)) + case iops < 500: + buffer.WriteString(yellow(avgline)) + default: + buffer.WriteString(green(avgline)) + } + //sort latencies - var keys []int + var keys []int64 for k := range latencygrade { keys = append(keys, k) } - sort.Ints(keys) + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) for _, k := range keys { var blocks bytes.Buffer var mseconds string switch { - case 10 <= k && k < 20: - mseconds = green(fmt.Sprintf("[%v-%v]", k, k+9)) - case 20 <= k && k < 100: - mseconds = red(fmt.Sprintf("[%v-%v]", k, k+9)) - case k >= 100: - mseconds = darkred(fmt.Sprintf("[%v-%v]", k, k+99)) + case k < 1000: + mseconds = green(fmt.Sprintf("[%.1f-%.1f)", float64(k)/1000, 0.1+float64(k)/1000)) + case k < 2000: + mseconds = yellow(fmt.Sprintf("[%.1f-%.1f)", float64(k)/1000, 0.1+float64(k)/1000)) + case k < 10000: + mseconds = yellow(fmt.Sprintf("[%3v-%3v)", k/1000, 1+k/1000)) + case k < 100000: + mseconds = red(fmt.Sprintf("[%3v-%3v)", k/1000, 10+k/1000)) + case k < 1000000: + mseconds = darkred(fmt.Sprintf("[%3v-%3v]", k/1000, 99+k/1000)) default: - mseconds = green(k) + mseconds = darkred(fmt.Sprintf("[%2vs-%2vs]", k/1000000, 1+k/1000000)) } for i := 0; i < 50*(latencygrade[k]*100/len(osdlatencies))/100; i++ { blocks.WriteString("#") } - iops := latencygrade[k] / int(params.duration.Seconds()) - avgspeed := (float64(latencygrade[k]) * float64(params.blocksize) / float64(params.duration.Seconds())) / 1024 / 1024 //mb/sec megabyteswritten := (float64(latencygrade[k]) * float64(params.blocksize)) / 1024 / 1024 - buffer.WriteString(fmt.Sprintf("%+9v ms: [%-50v] Count: %-5v IOPS: %-5v Avg speed: %-6.3f Mb/Sec Summary written: %6.3f Mb\n", - mseconds, blocks.String(), latencygrade[k], iops, avgspeed, megabyteswritten)) + buffer.WriteString(fmt.Sprintf("%+9v ms: [%-50v] Count: %-5v Total written: %6.3f MB\n", + mseconds, blocks.String(), latencygrade[k], megabyteswritten)) } result <- buffer.String() } -func bench_thread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, offsets []int64, params *Params, - result chan []time.Duration, objname string) { +func bench_thread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, bs int64, objsize int64, params *Params, + result chan []time.Duration, objnames []string) { starttime := time.Now() var latencies []time.Duration endtime := starttime.Add(params.duration) n := 0 for { - if time.Now().After(endtime) { + offset := rand.Int63n(objsize/bs) * bs + objname := objnames[rand.Int31n(int32(len(objnames)))] + startwritetime := time.Now() + if startwritetime.After(endtime) { break } - for _, offset := range offsets { - if time.Now().Before(endtime) { - startwritetime := time.Now() - err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset)) - endwritetime := time.Now() - if err != nil { - log.Printf("Can't write obj: %v, osd: %v", objname, osddevice.Name) - continue - } - latencies = append(latencies, endwritetime.Sub(startwritetime)) - } else { - break - } + err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset)) + endwritetime := time.Now() + if err != nil { + log.Printf("Can't write obj: %v, osd: %v", objname, osddevice.Name) + continue } + latencies = append(latencies, endwritetime.Sub(startwritetime)) if n == 0 { n++ } else { @@ -184,16 +190,15 @@ func main() { } } osddevices := GetOsds(cephconn, params) - offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize) var wg sync.WaitGroup results := make(chan string, len(osddevices)*int(params.threadsCount)) for _, osd := range osddevices { wg.Add(1) if params.parallel == true { - go bench(cephconn, osd, &buffs, offsets, ¶ms, &wg, results) + go bench(cephconn, osd, &buffs, params.blocksize, params.objectsize, ¶ms, &wg, results) } else { - bench(cephconn, osd, &buffs, offsets, ¶ms, &wg, results) + bench(cephconn, osd, &buffs, params.blocksize, params.objectsize, ¶ms, &wg, results) log.Println(<-results) }