diff --git a/ceph-gobench.go b/ceph-gobench.go index a1eaa07..89fb6d9 100644 --- a/ceph-gobench.go +++ b/ceph-gobench.go @@ -26,26 +26,25 @@ func makeoffsets(threads int64, bs int64, objsize int64) [][]int64 { return offsets } -func bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset [][]int64, params *Params, - wg *sync.WaitGroup, result chan string) { - var nwg sync.WaitGroup +func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [][]int64, params *Params, + wg *sync.WaitGroup, result chan []string) { defer wg.Done() + threadresult := make(chan string, params.threadsCount) + var osdresults []string for i := int64(0); i < params.threadsCount; i++ { - nwg.Add(1) - go _bench(cephconn, osddevice, host, buffs, offset[i], params, &nwg, result) - if params.parallel != true { - nwg.Wait() - } + go bench_thread(cephconn, osddevice, buffs, offset[i], params, threadresult, i) } - nwg.Wait() + for i := int64(0); i < params.threadsCount; i++ { + osdresults = append(osdresults, <-threadresult) + } + close(threadresult) + result <- osdresults } -func _bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset []int64, params *Params, - wg *sync.WaitGroup, result chan string) { - defer wg.Done() +func bench_thread(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset []int64, params *Params, + result chan string, threadnum int64) { time.Sleep(time.Second * time.Duration(1)) // prepare objects - log.Println(host, osddevice.Name) //somework - result <- fmt.Sprintf("Host: %v\nOsdname: %v", host, osddevice.Name) + result <- fmt.Sprintf("Host: %v Osdname: %v Threadnum: %v", osddevice.Info.Hostname, osddevice.Name, threadnum) } func main() { @@ -70,17 +69,27 @@ func main() { offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize) var wg sync.WaitGroup - result := make(chan string, len(offsets)) - for host, osds := range osddevices { - for _, osd := range osds { - wg.Add(1) - if params.parallel == true { - go bench(cephconn, osd, host, &buffs, offsets, ¶ms, &wg, result) - } else { - bench(cephconn, osd, host, &buffs, offsets, ¶ms, &wg, result) - } + results := make(chan []string, len(osddevices)*int(params.threadsCount)) + for _, osd := range osddevices { + wg.Add(1) + if params.parallel == true { + go bench(cephconn, osd, &buffs, offsets, ¶ms, &wg, results) + } else { + bench(cephconn, osd, &buffs, offsets, ¶ms, &wg, results) + log.Printf("%v \n", <-results) + } + + } + + if params.parallel == true { + go func() { + wg.Wait() + close(results) + }() + + for message := range results { + log.Printf("%v \n", message) } } - wg.Wait() }