Easy parallels

master
Alexey Kostin 2019-02-26 13:20:51 +03:00
parent 34352241a3
commit df824e8dc7
1 changed files with 33 additions and 24 deletions

View File

@ -26,26 +26,25 @@ func makeoffsets(threads int64, bs int64, objsize int64) [][]int64 {
return offsets return offsets
} }
func bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset [][]int64, params *Params, func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset [][]int64, params *Params,
wg *sync.WaitGroup, result chan string) { wg *sync.WaitGroup, result chan []string) {
var nwg sync.WaitGroup
defer wg.Done() defer wg.Done()
threadresult := make(chan string, params.threadsCount)
var osdresults []string
for i := int64(0); i < params.threadsCount; i++ { for i := int64(0); i < params.threadsCount; i++ {
nwg.Add(1) go bench_thread(cephconn, osddevice, buffs, offset[i], params, threadresult, i)
go _bench(cephconn, osddevice, host, buffs, offset[i], params, &nwg, result)
if params.parallel != true {
nwg.Wait()
}
} }
nwg.Wait() for i := int64(0); i < params.threadsCount; i++ {
osdresults = append(osdresults, <-threadresult)
}
close(threadresult)
result <- osdresults
} }
func _bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset []int64, params *Params, func bench_thread(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, offset []int64, params *Params,
wg *sync.WaitGroup, result chan string) { result chan string, threadnum int64) {
defer wg.Done()
time.Sleep(time.Second * time.Duration(1)) // prepare objects time.Sleep(time.Second * time.Duration(1)) // prepare objects
log.Println(host, osddevice.Name) //somework result <- fmt.Sprintf("Host: %v Osdname: %v Threadnum: %v", osddevice.Info.Hostname, osddevice.Name, threadnum)
result <- fmt.Sprintf("Host: %v\nOsdname: %v", host, osddevice.Name)
} }
func main() { func main() {
@ -70,17 +69,27 @@ func main() {
offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize) offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize)
var wg sync.WaitGroup var wg sync.WaitGroup
result := make(chan string, len(offsets)) results := make(chan []string, len(osddevices)*int(params.threadsCount))
for host, osds := range osddevices { for _, osd := range osddevices {
for _, osd := range osds { wg.Add(1)
wg.Add(1) if params.parallel == true {
if params.parallel == true { go bench(cephconn, osd, &buffs, offsets, &params, &wg, results)
go bench(cephconn, osd, host, &buffs, offsets, &params, &wg, result) } else {
} else { bench(cephconn, osd, &buffs, offsets, &params, &wg, results)
bench(cephconn, osd, host, &buffs, offsets, &params, &wg, result) log.Printf("%v \n", <-results)
} }
}
if params.parallel == true {
go func() {
wg.Wait()
close(results)
}()
for message := range results {
log.Printf("%v \n", message)
} }
} }
wg.Wait()
} }