Revert "Struct pointers"

This reverts commit b81a43bf
master
Alexey Kostin 2019-02-22 17:58:31 +03:00
parent a7bb9da78a
commit ba0e8df3a7
4 changed files with 97 additions and 46 deletions

View File

@ -4,19 +4,56 @@ import (
"fmt"
"log"
"math/rand"
"sync"
"time"
)
//future feature
func makepreudorandom() {
a := make([]int, 0, 4096/4)
for i := 0; i < 4096; i += 4 {
a = append(a, i)
func makeoffsets(threads int64, bs int64, objsize int64) [][]int64 {
var offsets [][]int64
for i := int64(0); i < threads; i++ {
s1 := rand.NewSource(i)
r1 := rand.New(s1)
localoffsets := make([]int64, 0, objsize-bs)
for i := int64(0); i < objsize-bs; i += bs {
localoffsets = append(localoffsets, i)
}
r1.Shuffle(len(localoffsets), func(i, j int) {
localoffsets[i], localoffsets[j] = localoffsets[j], localoffsets[i]
})
offsets = append(offsets, localoffsets)
}
rand.Shuffle(len(a), func(i, j int) {
a[i], a[j] = a[j], a[i]
})
fmt.Println(a)
return offsets
}
func bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset [][]int64, params *Params,
wg *sync.WaitGroup, ready chan bool, result chan string, allready uint64) {
var nwg sync.WaitGroup
tready := make(chan bool, params.threadsCount)
start := uint64(0)
defer wg.Done()
for i := int64(0); i < params.threadsCount; i++ {
nwg.Add(1)
go _bench(cephconn, osddevice, host, buffs, offset[i], params, &nwg, i, tready, result, &start)
if params.parallel != true {
nwg.Wait()
}
}
nwg.Wait()
}
func _bench(cephconn *Cephconnection, osddevice Device, host string, buffs *[][]byte, offset []int64, params *Params,
wg *sync.WaitGroup, i int64, ready chan bool, result chan string, start *uint64) {
defer wg.Done()
time.Sleep(time.Second * time.Duration(i)) // prepare objects
ready <- true
for {
if *start == 1 {
break
}
}
log.Println(host, i, osddevice.Name) //somework
result <- fmt.Sprintf("Host: %v\nOsdname: %v", host, osddevice.Name)
}
func main() {
@ -28,7 +65,7 @@ func main() {
time.Sleep(time.Millisecond * 100)
var buffs [][]byte
for i := 0; i < 2*params.threadsCount; i++ {
for i := int64(0); i < 2*params.threadsCount; i++ {
buffs = append(buffs, make([]byte, params.blocksize))
}
for num := range buffs {
@ -37,6 +74,23 @@ func main() {
log.Fatalln(err)
}
}
osddevices := GetOsds(cephconn, params)
offsets := makeoffsets(params.threadsCount, params.blocksize, params.objectsize)
var wg sync.WaitGroup
var allready uint64
var ready chan bool
var result chan string
for host, osds := range osddevices {
for _, osd := range osds {
wg.Add(1)
if params.parallel == true {
go bench(cephconn, osd, host, &buffs, offsets, &params, &wg, ready, result, allready)
} else {
bench(cephconn, osd, host, &buffs, offsets, &params, &wg, ready, result, allready)
}
}
}
wg.Wait()
GetOsds(cephconn, params)
}

View File

@ -17,6 +17,10 @@ func Route() Params {
"Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G")
gnuflag.StringVar(&params.bs, "s", "4K",
"Block size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G")
gnuflag.StringVar(&params.os, "objectsize", "4M",
"Object size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G")
gnuflag.StringVar(&params.os, "o", "4M",
"Object size in format KB = K = KiB = 1024 MB = M = MiB = 1024 * K GB = G = GiB = 1024 * M TB = T = TiB = 1024 * G")
gnuflag.StringVar(&params.user, "user", "admin",
"Ceph user (cephx)")
gnuflag.StringVar(&params.user, "u", "client.admin",
@ -39,23 +43,34 @@ func Route() Params {
"Ceph pool")
gnuflag.StringVar(&params.define, "define", "",
"Define specifically osd or host. osd.X or ceph-host-X")
gnuflag.IntVar(&params.threadsCount, "threads", 1,
gnuflag.Int64Var(&params.threadsCount, "threads", 1,
"Threads count")
gnuflag.IntVar(&params.threadsCount, "t", 1,
gnuflag.Int64Var(&params.threadsCount, "t", 1,
"Threads count on each osd")
gnuflag.BoolVar(&params.parallel, "parallel", false,
"Do test all osd in parallel mode")
gnuflag.Parse(true)
var err error
if params.mode == "osd" && len(params.define) != 0 {
if i := strings.HasPrefix(params.define, "osd."); i != true {
log.Fatalln("Define correct osd in format osd.X")
}
}
params.blocksize, err = bytefmt.ToBytes(params.bs)
blocksize, err := bytefmt.ToBytes(params.bs)
params.blocksize = int64(blocksize)
if err != nil {
log.Println("Can't convert defined block size. 4K block size will be used")
params.blocksize = 4096
}
//uint64(params.objectsize), err = bytefmt.ToBytes(params.os)
objsize, err := bytefmt.ToBytes(params.os)
params.objectsize = int64(objsize)
if err != nil {
log.Println("Can't convert defined block size. 4K block size will be used")
params.objectsize = 4194304
}
if params.objectsize/params.blocksize < 2 {
log.Fatalf("Current block size: %v\nCurrent object size: %v\nObject size must be at least 2 times bigger than block size", params.blocksize, params.objectsize)
}
return params
}

View File

@ -92,7 +92,7 @@ func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket {
return rootbuckets
}
func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) map[string]BenchOsd {
func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) map[string][]Device {
var crushrule int64
var crushrulename string
for _, pool := range osddump.Pools {
@ -112,8 +112,8 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
}
}
osdhosts := make(map[string]BenchOsd)
var devices []Device
osdhosts := make(map[string][]Device)
//var devices []Device
bucketitems := GetCrushHostBuckets(osdcrushdump.Buckets, rootid)
if params.define != "" {
if strings.HasPrefix(params.define, "osd.") {
@ -124,17 +124,13 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
for _, osdmetadata := range osdsmetadata {
if osdmetadata.ID == device.ID {
device.Info = osdmetadata
osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device)
}
}
devices = append(devices, device)
}
}
}
if len(devices) != 0 {
osdhosts[hostbucket.Name] = BenchOsd{Osds: devices}
devices = []Device{}
}
}
if len(osdhosts) == 0 {
log.Fatalf("Defined osd not exist in root for rule: %v pool: %v.\nYou should define osd like osd.X",
@ -149,18 +145,14 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
for _, osdmetadata := range osdsmetadata {
if osdmetadata.ID == device.ID {
device.Info = osdmetadata
osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device)
}
}
devices = append(devices, device)
}
}
}
}
if len(devices) != 0 {
osdhosts[hostbucket.Name] = BenchOsd{Osds: devices}
devices = []Device{}
}
}
if len(osdhosts) == 0 {
log.Fatalf("Defined host not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool)
@ -177,14 +169,10 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum
}
}
devices = append(devices, device)
osdhosts[hostbucket.Name] = append(osdhosts[hostbucket.Name], device)
}
}
}
if len(devices) != 0 {
osdhosts[hostbucket.Name] = BenchOsd{Osds: devices}
devices = []Device{}
}
}
if len(osdhosts) == 0 {
log.Fatalf("Osd not exist in root for rule: %v pool: %v", crushrulename, poolinfo.Pool)
@ -202,7 +190,7 @@ func ContainsPg(pgs []PlacementGroup, i int64) bool {
return false
}
func GetOsds(cephconn *Cephconnection, params Params) map[string]BenchOsd {
func GetOsds(cephconn *Cephconnection, params Params) map[string][]Device {
poolinfo := GetPoolSize(cephconn, params)
if poolinfo.Size != 1 {
log.Fatalf("Pool size must be 1. Current size for pool %v is %v. Don't forget that it must be useless pool (not production). Do:\n # ceph osd pool set %v min_size 1\n # ceph osd pool set %v size 1",
@ -213,8 +201,8 @@ func GetOsds(cephconn *Cephconnection, params Params) map[string]BenchOsd {
osddump := GetOsdDump(cephconn)
osdsmetadata := GetOsdMetadata(cephconn)
osddevices := GetOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata)
for _, values := range osddevices {
for _, item := range values.Osds {
for _, devices := range osddevices {
for _, item := range devices {
if exist := ContainsPg(placementGroups, item.ID); exist == false {
log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num")
}

View File

@ -6,11 +6,11 @@ import (
)
type Params struct {
duration time.Duration
threadsCount int
blocksize uint64
parallel bool
bs, cluster, user, keyring, config, pool, mode, define string
duration time.Duration
threadsCount int64
blocksize, objectsize int64
parallel bool
bs, os, cluster, user, keyring, config, pool, mode, define string
}
type Cephconnection struct {
@ -373,9 +373,3 @@ type OsdMetadata struct {
OsdObjectstore string `json:"osd_objectstore"`
Rotational string `json:"rotational"`
}
type BenchOsd struct {
Osds []Device
Buffs *[][]byte
Offsets []int64
}