Move profiler to main

master
Alexey Kostin 2019-03-03 23:54:51 +03:00
parent 1d0b139a3a
commit ad14e37433
1 changed files with 9 additions and 24 deletions

View File

@ -16,7 +16,7 @@ import (
"time" "time"
) )
func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, startbuff *[]byte, params *Params, func bench(cephconn *Cephconnection, osddevice Device, buff *[]byte, startbuff *[]byte, params *Params,
wg *sync.WaitGroup, result chan string) { wg *sync.WaitGroup, result chan string) {
defer wg.Done() defer wg.Done()
threadresult := make(chan []time.Duration, params.threadsCount) threadresult := make(chan []time.Duration, params.threadsCount)
@ -41,7 +41,7 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, startbuf
} }
} }
for i := 0; i < int(params.threadsCount); i++ { for i := 0; i < int(params.threadsCount); i++ {
go BenchThread(cephconn, osddevice, (*buffs)[i*2:i*2+2], params, threadresult, objectnames[i*16:i*16+16]) go BenchThread(cephconn, osddevice, params, buff, threadresult, objectnames[i*16:i*16+16])
} }
for i := uint64(0); i < params.threadsCount; i++ { for i := uint64(0); i < params.threadsCount; i++ {
for _, lat := range <-threadresult { for _, lat := range <-threadresult {
@ -174,13 +174,11 @@ func bench(cephconn *Cephconnection, osddevice Device, buffs *[][]byte, startbuf
result <- buffer.String() result <- buffer.String()
} }
func BenchThread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, params *Params, func BenchThread(cephconn *Cephconnection, osddevice Device, params *Params, buff *[]byte,
result chan []time.Duration, objnames []string) { result chan []time.Duration, objnames []string) {
starttime := time.Now()
var latencies []time.Duration var latencies []time.Duration
starttime := time.Now()
endtime := starttime.Add(params.duration) endtime := starttime.Add(params.duration)
n := 0
for { for {
offset := rand.Int63n(int64(params.objectsize/params.blocksize)) * int64(params.blocksize) offset := rand.Int63n(int64(params.objectsize/params.blocksize)) * int64(params.blocksize)
objname := objnames[rand.Int31n(int32(len(objnames)))] objname := objnames[rand.Int31n(int32(len(objnames)))]
@ -188,18 +186,13 @@ func BenchThread(cephconn *Cephconnection, osddevice Device, buffs [][]byte, par
if startwritetime.After(endtime) { if startwritetime.After(endtime) {
break break
} }
err := cephconn.ioctx.Write(objname, buffs[n], uint64(offset)) err := cephconn.ioctx.Write(objname, *buff, uint64(offset))
endwritetime := time.Now() endwritetime := time.Now()
if err != nil { if err != nil {
log.Printf("Can't write object: %v, osd: %v", objname, osddevice.Name) log.Printf("Can't write object: %v, osd: %v", objname, osddevice.Name)
continue continue
} }
latencies = append(latencies, endwritetime.Sub(startwritetime)) latencies = append(latencies, endwritetime.Sub(startwritetime))
if n == 0 {
n++
} else {
n = 0
}
} }
result <- latencies result <- latencies
} }
@ -235,27 +228,19 @@ func main() {
// https://tracker.ceph.com/issues/24114 // https://tracker.ceph.com/issues/24114
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond * 100)
var buffs [][]byte
for i := uint64(0); i < 2*params.threadsCount; i++ {
buffs = append(buffs, make([]byte, params.blocksize))
}
startbuff := make([]byte, 4096) startbuff := make([]byte, 4096)
for num := range buffs {
_, err := rand.Read(buffs[num])
if err != nil {
log.Fatalln(err)
}
}
osddevices := GetOsds(cephconn, params) osddevices := GetOsds(cephconn, params)
buff := make([]byte, params.blocksize)
rand.Read(buff)
var wg sync.WaitGroup var wg sync.WaitGroup
results := make(chan string, len(osddevices)*int(params.threadsCount)) results := make(chan string, len(osddevices)*int(params.threadsCount))
for _, osd := range osddevices { for _, osd := range osddevices {
wg.Add(1) wg.Add(1)
if params.parallel == true { if params.parallel == true {
go bench(cephconn, osd, &buffs, &startbuff, &params, &wg, results) go bench(cephconn, osd, &buff, &startbuff, &params, &wg, results)
} else { } else {
bench(cephconn, osd, &buffs, &startbuff, &params, &wg, results) bench(cephconn, osd, &buff, &startbuff, &params, &wg, results)
log.Println(<-results) log.Println(<-results)
} }