diff --git a/ceph-gobench.go b/ceph-gobench.go index 45df3ec..0a5955b 100644 --- a/ceph-gobench.go +++ b/ceph-gobench.go @@ -13,25 +13,7 @@ import ( "time" ) -//future feature -func makeoffsets(threads int64, bs int64, objsize int64) [][]int64 { - var offsets [][]int64 - for i := int64(0); i < threads; i++ { - s1 := rand.NewSource(i) - r1 := rand.New(s1) - localoffsets := make([]int64, 0, objsize-bs) - for i := int64(0); i < objsize-bs; i += bs { - localoffsets = append(localoffsets, i) - } - r1.Shuffle(len(localoffsets), func(i, j int) { - localoffsets[i], localoffsets[j] = localoffsets[j], localoffsets[i] - }) - offsets = append(offsets, localoffsets) - } - return offsets -} - -func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [][]int64, params *Params, +func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, bs int64, objsize int64, params *Params, wg *sync.WaitGroup, result chan string) { defer wg.Done() threadresult := make(chan []time.Duration, params.threadsCount) @@ -43,14 +25,14 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ } }() // calculate object for each thread - for suffix := 0; len(objectnames) < int(params.threadsCount); suffix++ { + for suffix := 0; len(objectnames) < int(params.threadsCount)*16; suffix++ { name := "bench_" + strconv.Itoa(suffix) if osddevice.ID == GetObjActingPrimary(cephconn, *params, name) { objectnames = append(objectnames, name) } } - for i, j := 0, 0; i < int(params.threadsCount); i, j = i+1, j+2 { - go bench_thread(cephconn, osddevice, (*buffs)[j:j+2], offset[i], params, threadresult, objectnames[i]) + for i := 0; i < int(params.threadsCount); i++ { + go bench_thread(cephconn, osddevice, (*buffs)[i*2:i*2+2], bs, objsize, params, threadresult, objectnames[i*16:i*16+16]) } for i := int64(0); i < params.threadsCount; i++ { for _, lat := range <-threadresult { @@ -131,31 +113,27 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [ result <- buffer.String() } -func bench_thread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, offsets []int64, params *Params, - result chan []time.Duration, objname string) { +func bench_thread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, bs int64, objsize int64, params *Params, + result chan []time.Duration, objnames []string) { starttime := time.Now() var latencies []time.Duration endtime := starttime.Add(params.duration) n := 0 for { - if time.Now().After(endtime) { + offset := rand.Int63n(objsize/bs) * bs + objname := objnames[rand.Int31n(int32(len(objnames)))] + startwritetime := time.Now() + if startwritetime.After(endtime) { break } - for _, offset := range offsets { - if time.Now().Before(endtime) { - startwritetime := time.Now() - err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset)) - endwritetime := time.Now() - if err != nil { - log.Printf("Can't write obj: %v, osd: %v", objname, osddevice.Name) - continue - } - latencies = append(latencies, endwritetime.Sub(startwritetime)) - } else { - break - } + err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset)) + endwritetime := time.Now() + if err != nil { + log.Printf("Can't write obj: %v, osd: %v", objname, osddevice.Name) + continue } + latencies = append(latencies, endwritetime.Sub(startwritetime)) if n == 0 { n++ } else { @@ -184,16 +162,15 @@ func main() { } } osddevices := GetOsds(cephconn, params) - offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize) var wg sync.WaitGroup results := make(chan string, len(osddevices)*int(params.threadsCount)) for _, osd := range osddevices { wg.Add(1) if params.parallel == true { - go bench(cephconn, osd, &buffs, offsets, ¶ms, &wg, results) + go bench(cephconn, osd, &buffs, params.blocksize, params.objectsize, ¶ms, &wg, results) } else { - bench(cephconn, osd, &buffs, offsets, ¶ms, &wg, results) + bench(cephconn, osd, &buffs, params.blocksize, params.objectsize, ¶ms, &wg, results) log.Println(<-results) }