diff --git a/ceph-gobench.go b/ceph-gobench.go index 28c0a6c..0d1020d 100644 --- a/ceph-gobench.go +++ b/ceph-gobench.go @@ -4,19 +4,56 @@ import ( "fmt" "log" "math/rand" + "sync" "time" ) //future feature -func makepreudorandom() { - a := make([]int, 0, 4096/4) - for i := 0; i < 4096; i += 4 { - a = append(a, i) +func makeoffsets(threads int64, bs int64, objsize int64) [][]int64 { + var offsets [][]int64 + for i := int64(0); i < threads; i++ { + s1 := rand.NewSource(i) + r1 := rand.New(s1) + localoffsets := make([]int64, 0, objsize-bs) + for i := int64(0); i < objsize-bs; i += bs { + localoffsets = append(localoffsets, i) + } + r1.Shuffle(len(localoffsets), func(i, j int) { + localoffsets[i], localoffsets[j] = localoffsets[j], localoffsets[i] + }) + offsets = append(offsets, localoffsets) } - rand.Shuffle(len(a), func(i, j int) { - a[i], a[j] = a[j], a[i] - }) - fmt.Println(a) + return offsets +} + +func bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset [][]int64, params *Params, + wg *sync.WaitGroup, ready chan bool, result chan string, allready uint64) { + var nwg sync.WaitGroup + tready := make(chan bool, params.threadsCount) + start := uint64(0) + defer wg.Done() + for i := int64(0); i < params.threadsCount; i++ { + nwg.Add(1) + go _bench(cephconn, osddevice, host, buffs, offset[i], params, &nwg, i, tready, result, &start) + if params.parallel != true { + nwg.Wait() + } + } + nwg.Wait() +} + +func _bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset []int64, params *Params, + wg *sync.WaitGroup, i int64, ready chan bool, result chan string, start *uint64) { + defer wg.Done() + time.Sleep(time.Second * time.Duration(i)) // prepare objects + ready <- true + for { + if *start == 1 { + break + } + } + log.Println(host, i, osddevice.Name) //somework + result <- fmt.Sprintf("Host: %v\nOsdname: %v", host, osddevice.Name) } func main() { @@ -28,7 +65,7 @@ func main() { time.Sleep(time.Millisecond * 100) var buffs [][]byte - for i := 0; i < 2*params.threadsCount; i++ { + for i := int64(0); i < 2*params.threadsCount; i++ { buffs = append(buffs, make([]byte, params.blocksize)) } for num := range buffs { @@ -37,6 +74,23 @@ func main() { log.Fatalln(err) } } + osddevices := GetOsds(cephconn, params) + offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize) + + var wg sync.WaitGroup + var allready uint64 + var ready chan bool + var result chan string + for host, osds := range osddevices { + for _, osd := range osds { + wg.Add(1) + if params.parallel == true { + go bench(cephconn, osd, host, &buffs, offsets, ¶ms, &wg, ready, result, allready) + } else { + bench(cephconn, osd, host, &buffs, offsets, ¶ms, &wg, ready, result, allready) + } + } + } + wg.Wait() - GetOsds(cephconn, params) } diff --git a/flags.go b/flags.go index c37f2c8..fc96988 100644 --- a/flags.go +++ b/flags.go @@ -17,6 +17,10 @@ func Route() Params { "Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G") gnuflag.StringVar(¶ms.bs, "s", "4K", "Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G") + gnuflag.StringVar(¶ms.os, "objectsize", "4M", + "Object size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G") + gnuflag.StringVar(¶ms.os, "o", "4M", + "Object size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G") gnuflag.StringVar(¶ms.user, "user", "admin", "Ceph user (cephx)") gnuflag.StringVar(¶ms.user, "u", "client.admin", @@ -39,23 +43,34 @@ func Route() Params { "Ceph pool") gnuflag.StringVar(¶ms.define, "define", "", "Define specifically osd or host. osd.X or ceph-host-X") - gnuflag.IntVar(¶ms.threadsCount, "threads", 1, + gnuflag.Int64Var(¶ms.threadsCount, "threads", 1, "Threads count") - gnuflag.IntVar(¶ms.threadsCount, "t", 1, + gnuflag.Int64Var(¶ms.threadsCount, "t", 1, "Threads count on each osd") gnuflag.BoolVar(¶ms.parallel, "parallel", false, "Do test all osd in parallel mode") gnuflag.Parse(true) - var err error + if params.mode == "osd" && len(params.define) != 0 { if i := strings.HasPrefix(params.define, "osd."); i != true { log.Fatalln("Define correct osd in format osd.X") } } - params.blocksize, err = bytefmt.ToBytes(params.bs) + blocksize, err := bytefmt.ToBytes(params.bs) + params.blocksize = int64(blocksize) if err != nil { log.Println("Can't convert defined block size. 4K block size will be used") params.blocksize = 4096 } + //uint64(params.objectsize), err = bytefmt.ToBytes(params.os) + objsize, err := bytefmt.ToBytes(params.os) + params.objectsize = int64(objsize) + if err != nil { + log.Println("Can't convert defined block size. 4K block size will be used") + params.objectsize = 4194304 + } + if params.objectsize/params.blocksize < 2 { + log.Fatalf("Current block size: %v\nCurrent object size: %v\nObject size must be at least 2 times bigger than block size", params.blocksize, params.objectsize) + } return params } diff --git a/getosd.go b/getosd.go index 6d6734c..f16c185 100644 --- a/getosd.go +++ b/getosd.go @@ -92,7 +92,7 @@ func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { return rootbuckets } -func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) map[string]BenchOsd { +func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) map[string][]Device { var crushrule int64 var crushrulename string for _, pool := range osddump.Pools { @@ -112,8 +112,8 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } } - osdhosts := make(map[string]BenchOsd) - var devices []Device + osdhosts := make(map[string][]Device) + //var devices []Device bucketitems := GetCrushHostBuckets(osdcrushdump.Buckets, rootid) if params.define != "" { if strings.HasPrefix(params.define, "osd.") { @@ -124,17 +124,13 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum for _, osdmetadata := range osdsmetadata { if osdmetadata.ID == device.ID { device.Info = osdmetadata + osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) } } - devices = append(devices, device) } } } - if len(devices) != 0 { - osdhosts[hostbucket.Name] = BenchOsd{Osds: devices} - devices = []Device{} - } } if len(osdhosts) == 0 { log.Fatalf("Defined osd not exist in root for rule: %v pool: %v.\nYou should define osd like osd.X", @@ -149,18 +145,14 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum for _, osdmetadata := range osdsmetadata { if osdmetadata.ID == device.ID { device.Info = osdmetadata + osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) } } - devices = append(devices, device) } } } } - if len(devices) != 0 { - osdhosts[hostbucket.Name] = BenchOsd{Osds: devices} - devices = []Device{} - } } if len(osdhosts) == 0 { log.Fatalf("Defined host not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool) @@ -177,14 +169,10 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } } - devices = append(devices, device) + osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device) } } } - if len(devices) != 0 { - osdhosts[hostbucket.Name] = BenchOsd{Osds: devices} - devices = []Device{} - } } if len(osdhosts) == 0 { log.Fatalf("Osd not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool) @@ -202,7 +190,7 @@ func ContainsPg(pgs []PlacementGroup, i int64) bool { return false } -func GetOsds(cephconn *Cephconnection, params Params) map[string]BenchOsd { +func GetOsds(cephconn *Cephconnection, params Params) map[string][]Device { poolinfo := GetPoolSize(cephconn, params) if poolinfo.Size != 1 { log.Fatalf("Pool size must be 1. Current size for pool %v is %v. Don't forget that it must be useless pool (not production). Do:\n # ceph osd pool set %v min_size 1\n # ceph osd pool set %v size 1", @@ -213,8 +201,8 @@ func GetOsds(cephconn *Cephconnection, params Params) map[string]BenchOsd { osddump := GetOsdDump(cephconn) osdsmetadata := GetOsdMetadata(cephconn) osddevices := GetOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata) - for _, values := range osddevices { - for _, item := range values.Osds { + for _, devices := range osddevices { + for _, item := range devices { if exist := ContainsPg(placementGroups, item.ID); exist == false { log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num") } diff --git a/types.go b/types.go index 7182df5..107c4fd 100644 --- a/types.go +++ b/types.go @@ -6,11 +6,11 @@ import ( ) type Params struct { - duration time.Duration - threadsCount int - blocksize uint64 - parallel bool - bs, cluster, user, keyring, config, pool, mode, define string + duration time.Duration + threadsCount int64 + blocksize, objectsize int64 + parallel bool + bs, os, cluster, user, keyring, config, pool, mode, define string } type Cephconnection struct { @@ -373,9 +373,3 @@ type OsdMetadata struct { OsdObjectstore string `json:"osd_objectstore"` Rotational string `json:"rotational"` } - -type BenchOsd struct { - Osds []Device - Buffs *[][]byte - Offsets []int64 -}