diff --git a/ceph-gobench.go b/ceph-gobench.go index fbec44e..580a94e 100644 --- a/ceph-gobench.go +++ b/ceph-gobench.go @@ -16,7 +16,7 @@ import ( "time" ) -func bench(cephconn *Cephconnection, osddevice Device, buff *[]byte, startbuff *[]byte, params *Params, +func bench(cephconn *cephconnection, osddevice Device, buff *[]byte, startbuff *[]byte, params *params, wg *sync.WaitGroup, result chan string) { defer wg.Done() threadresult := make(chan []time.Duration, params.threadsCount) @@ -30,7 +30,7 @@ func bench(cephconn *Cephconnection, osddevice Device, buff *[]byte, startbuff * // calculate object for each thread for suffix := 0; len(objectnames) < int(params.threadsCount)*16; suffix++ { name := "bench_" + strconv.Itoa(suffix) - if osddevice.ID == GetObjActingPrimary(cephconn, *params, name) { + if osddevice.ID == getObjActingPrimary(cephconn, *params, name) { objectnames = append(objectnames, name) if err := cephconn.ioctx.WriteFull(name, *startbuff); err != nil { log.Printf("Can't write object: %v, osd: %v", name, osddevice.Name) @@ -41,7 +41,7 @@ func bench(cephconn *Cephconnection, osddevice Device, buff *[]byte, startbuff * } } for i := 0; i < int(params.threadsCount); i++ { - go BenchThread(cephconn, osddevice, params, buff, threadresult, objectnames[i*16:i*16+16]) + go benchthread(cephconn, osddevice, params, buff, threadresult, objectnames[i*16:i*16+16]) } for i := uint64(0); i < params.threadsCount; i++ { for _, lat := range <-threadresult { @@ -174,7 +174,7 @@ func bench(cephconn *Cephconnection, osddevice Device, buff *[]byte, startbuff * result <- buffer.String() } -func BenchThread(cephconn *Cephconnection, osddevice Device, params *Params, buff *[]byte, +func benchthread(cephconn *cephconnection, osddevice Device, params *params, buff *[]byte, result chan []time.Duration, objnames []string) { var latencies []time.Duration starttime := time.Now() @@ -198,7 +198,7 @@ func BenchThread(cephconn *Cephconnection, osddevice Device, params *Params, buf } func main() { - params := Route() + params := route() if params.cpuprofile != "" { f, err := os.Create(params.cpuprofile) if err != nil { @@ -229,7 +229,7 @@ func main() { time.Sleep(time.Millisecond * 100) startbuff := make([]byte, 4096) - osddevices := GetOsds(cephconn, params) + osddevices := getOsds(cephconn, params) buff := make([]byte, params.blocksize) rand.Read(buff) diff --git a/cephconnection.go b/cephconnection.go index 4aaa256..286dd51 100644 --- a/cephconnection.go +++ b/cephconnection.go @@ -6,8 +6,8 @@ import ( "os" ) -func connectioninit(params Params) *Cephconnection { - cephconn := &Cephconnection{} +func connectioninit(params params) *cephconnection { + cephconn := &cephconnection{} var err error if _, err := os.Stat(params.config); os.IsNotExist(err) { log.Fatalf("Congif file not exists. Error: %v\n", err) diff --git a/flags.go b/flags.go index 119d99f..075da3c 100644 --- a/flags.go +++ b/flags.go @@ -7,8 +7,8 @@ import ( "time" ) -func Route() Params { - params := Params{} +func route() params { + params := params{} gnuflag.DurationVar(¶ms.duration, "duration", 30*time.Second, "Time limit for each test in seconds") gnuflag.DurationVar(¶ms.duration, "d", 30*time.Second, diff --git a/getosd.go b/getosd.go index cd8a62b..3d2872e 100644 --- a/getosd.go +++ b/getosd.go @@ -6,7 +6,7 @@ import ( "strings" ) -func MakeMonQuery(cephconn *Cephconnection, query map[string]string) []byte { +func makeMonQuery(cephconn *cephconnection, query map[string]string) []byte { monjson, err := json.Marshal(query) if err != nil { log.Fatalf("Can't marshal json mon query. Error: %v", err) @@ -19,8 +19,8 @@ func MakeMonQuery(cephconn *Cephconnection, query map[string]string) []byte { return monrawanswer } -func GetPoolSize(cephconn *Cephconnection, params Params) Poolinfo { - monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd pool get", "pool": params.pool, +func getPoolSize(cephconn *cephconnection, params params) Poolinfo { + monrawanswer := makeMonQuery(cephconn, map[string]string{"prefix": "osd pool get", "pool": params.pool, "format": "json", "var": "size"}) monanswer := Poolinfo{} if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { @@ -30,8 +30,8 @@ func GetPoolSize(cephconn *Cephconnection, params Params) Poolinfo { } -func GetPgByPool(cephconn *Cephconnection, params Params) []PlacementGroup { - monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "pg ls-by-pool", "poolstr": params.pool, +func getPgByPool(cephconn *cephconnection, params params) []PlacementGroup { + monrawanswer := makeMonQuery(cephconn, map[string]string{"prefix": "pg ls-by-pool", "poolstr": params.pool, "format": "json"}) var monanswer []PlacementGroup if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { @@ -40,8 +40,8 @@ func GetPgByPool(cephconn *Cephconnection, params Params) []PlacementGroup { return monanswer } -func GetOsdCrushDump(cephconn *Cephconnection) OsdCrushDump { - monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd crush dump", "format": "json"}) +func getOsdCrushDump(cephconn *cephconnection) OsdCrushDump { + monrawanswer := makeMonQuery(cephconn, map[string]string{"prefix": "osd crush dump", "format": "json"}) var monanswer OsdCrushDump if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { log.Fatalf("Can't parse monitor answer. Error: %v", err) @@ -49,8 +49,8 @@ func GetOsdCrushDump(cephconn *Cephconnection) OsdCrushDump { return monanswer } -func GetOsdDump(cephconn *Cephconnection) OsdDump { - monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd dump", "format": "json"}) +func getOsdDump(cephconn *cephconnection) OsdDump { + monrawanswer := makeMonQuery(cephconn, map[string]string{"prefix": "osd dump", "format": "json"}) var monanswer OsdDump if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { log.Fatalf("Can't parse monitor answer. Error: %v", err) @@ -58,8 +58,8 @@ func GetOsdDump(cephconn *Cephconnection) OsdDump { return monanswer } -func GetOsdMetadata(cephconn *Cephconnection) []OsdMetadata { - monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd metadata", "format": "json"}) +func getOsdMetadata(cephconn *cephconnection) []OsdMetadata { + monrawanswer := makeMonQuery(cephconn, map[string]string{"prefix": "osd metadata", "format": "json"}) var monanswer []OsdMetadata if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { log.Fatalf("Can't parse monitor answer. Error: %v", err) @@ -67,8 +67,8 @@ func GetOsdMetadata(cephconn *Cephconnection) []OsdMetadata { return monanswer } -func GetObjActingPrimary(cephconn *Cephconnection, params Params, objname string) int64 { - monrawanswer := MakeMonQuery(cephconn, map[string]string{"prefix": "osd map", "pool": params.pool, +func getObjActingPrimary(cephconn *cephconnection, params params, objname string) int64 { + monrawanswer := makeMonQuery(cephconn, map[string]string{"prefix": "osd map", "pool": params.pool, "object": objname, "format": "json"}) var monanswer OsdMap if err := json.Unmarshal([]byte(monrawanswer), &monanswer); err != nil { @@ -77,7 +77,7 @@ func GetObjActingPrimary(cephconn *Cephconnection, params Params, objname string return monanswer.UpPrimary } -func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { +func getCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { var rootbuckets []Bucket for _, bucket := range buckets { if bucket.ID == itemid { @@ -85,7 +85,7 @@ func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { rootbuckets = append(rootbuckets, bucket) } else { for _, item := range bucket.Items { - result := GetCrushHostBuckets(buckets, item.ID) + result := getCrushHostBuckets(buckets, item.ID) for _, it := range result { rootbuckets = append(rootbuckets, it) } @@ -96,7 +96,7 @@ func GetCrushHostBuckets(buckets []Bucket, itemid int64) []Bucket { return rootbuckets } -func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) []Device { +func getOsdForLocations(params params, osdcrushdump OsdCrushDump, osddump OsdDump, poolinfo Poolinfo, osdsmetadata []OsdMetadata) []Device { var crushrule, rootid int64 var crushrulename string for _, pool := range osddump.Pools { @@ -120,7 +120,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum } var osddevices []Device - bucketitems := GetCrushHostBuckets(osdcrushdump.Buckets, rootid) + bucketitems := getCrushHostBuckets(osdcrushdump.Buckets, rootid) if params.define != "" { if strings.HasPrefix(params.define, "osd.") { for _, hostbucket := range bucketitems { @@ -185,7 +185,7 @@ func GetOsdForLocations(params Params, osdcrushdump OsdCrushDump, osddump OsdDum return osddevices } -func ContainsPg(pgs []PlacementGroup, i int64) bool { +func containsPg(pgs []PlacementGroup, i int64) bool { for _, pg := range pgs { if i == pg.ActingPrimary { return true @@ -194,19 +194,19 @@ func ContainsPg(pgs []PlacementGroup, i int64) bool { return false } -func GetOsds(cephconn *Cephconnection, params Params) []Device { - poolinfo := GetPoolSize(cephconn, params) +func getOsds(cephconn *cephconnection, params params) []Device { + poolinfo := getPoolSize(cephconn, params) if poolinfo.Size != 1 { log.Fatalf("Pool size must be 1. Current size for pool %v is %v. Don't forget that it must be useless pool (not production). Do:\n # ceph osd pool set %v min_size 1\n # ceph osd pool set %v size 1", poolinfo.Pool, poolinfo.Size, poolinfo.Pool, poolinfo.Pool) } - placementGroups := GetPgByPool(cephconn, params) - crushosddump := GetOsdCrushDump(cephconn) - osddump := GetOsdDump(cephconn) - osdsmetadata := GetOsdMetadata(cephconn) - osddevices := GetOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata) + placementGroups := getPgByPool(cephconn, params) + crushosddump := getOsdCrushDump(cephconn) + osddump := getOsdDump(cephconn) + osdsmetadata := getOsdMetadata(cephconn) + osddevices := getOsdForLocations(params, crushosddump, osddump, poolinfo, osdsmetadata) for _, device := range osddevices { - if exist := ContainsPg(placementGroups, device.ID); exist == false { + if exist := containsPg(placementGroups, device.ID); exist == false { log.Fatalln("Not enough pg for test. Some osd haven't placement group at all. Increase pg_num and pgp_num") } } diff --git a/types.go b/types.go index eff30c8..7fac2e1 100644 --- a/types.go +++ b/types.go @@ -5,7 +5,7 @@ import ( "time" ) -type Params struct { +type params struct { duration time.Duration threadsCount uint64 blocksize, objectsize uint64 @@ -13,7 +13,7 @@ type Params struct { bs, os, cluster, user, keyring, config, pool, define, cpuprofile, memprofile string } -type Cephconnection struct { +type cephconnection struct { conn *rados.Conn ioctx *rados.IOContext }