diff --git a/hsbench.go b/hsbench.go index 5ec745e..c000299 100644 --- a/hsbench.go +++ b/hsbench.go @@ -6,6 +6,7 @@ package main import ( "bytes" + "code.cloudfoundry.org/bytefmt" "crypto/hmac" "crypto/md5" "crypto/sha1" @@ -15,6 +16,10 @@ import ( "encoding/json" "flag" "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" "io" "io/ioutil" "log" @@ -29,11 +34,6 @@ import ( "sync" "sync/atomic" "time" - "code.cloudfoundry.org/bytefmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" ) // Global variables @@ -136,82 +136,82 @@ func setSignature(req *http.Request) { } type IntervalStats struct { - loop int - name string - mode string - bytes int64 - slowdowns int64 - intervalNano int64 - latNano []int64 + loop int + name string + mode string + bytes int64 + slowdowns int64 + intervalNano int64 + latNano []int64 } func (is *IntervalStats) makeOutputStats() OutputStats { - // Compute and log the stats - ops := len(is.latNano) - totalLat := int64(0); - minLat := float64(0); - maxLat := float64(0); - NinetyNineLat := float64(0); - avgLat := float64(0); - if ops > 0 { - minLat = float64(is.latNano[0]) / 1000000 - maxLat = float64(is.latNano[ops - 1]) / 1000000 - for i := range is.latNano { - totalLat += is.latNano[i] - } - avgLat = float64(totalLat) / float64(ops) / 1000000 - NintyNineLatNano := is.latNano[int64(math.Round(0.99*float64(ops))) - 1] - NinetyNineLat = float64(NintyNineLatNano) / 1000000 - } - seconds := float64(is.intervalNano) / 1000000000 - mbps := float64(is.bytes) / seconds / bytefmt.MEGABYTE - iops := float64(ops) / seconds + // Compute and log the stats + ops := len(is.latNano) + totalLat := int64(0) + minLat := float64(0) + maxLat := float64(0) + NinetyNineLat := float64(0) + avgLat := float64(0) + if ops > 0 { + minLat = float64(is.latNano[0]) / 1000000 + maxLat = float64(is.latNano[ops-1]) / 1000000 + for i := range is.latNano { + totalLat += is.latNano[i] + } + avgLat = float64(totalLat) / float64(ops) / 1000000 + NintyNineLatNano := is.latNano[int64(math.Round(0.99*float64(ops)))-1] + NinetyNineLat = float64(NintyNineLatNano) / 1000000 + } + seconds := float64(is.intervalNano) / 1000000000 + mbps := float64(is.bytes) / seconds / bytefmt.MEGABYTE + iops := float64(ops) / seconds - return OutputStats{ - is.loop, - is.name, - seconds, - is.mode, - ops, - mbps, - iops, - minLat, - avgLat, - NinetyNineLat, - maxLat, - is.slowdowns} + return OutputStats{ + is.loop, + is.name, + seconds, + is.mode, + ops, + mbps, + iops, + minLat, + avgLat, + NinetyNineLat, + maxLat, + is.slowdowns} } type OutputStats struct { - Loop int - IntervalName string - Seconds float64 - Mode string - Ops int - Mbps float64 - Iops float64 - MinLat float64 - AvgLat float64 - NinetyNineLat float64 - MaxLat float64 - Slowdowns int64 + Loop int + IntervalName string + Seconds float64 + Mode string + Ops int + Mbps float64 + Iops float64 + MinLat float64 + AvgLat float64 + NinetyNineLat float64 + MaxLat float64 + Slowdowns int64 } func (o *OutputStats) log() { log.Printf( - "Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d", - o.Loop, - o.IntervalName, - o.Seconds, - o.Mode, - o.Ops, - o.Mbps, - o.Iops, - o.MinLat, - o.AvgLat, - o.NinetyNineLat, - o.MaxLat, - o.Slowdowns) + "Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d", + o.Loop, + o.IntervalName, + o.Seconds, + o.Mode, + o.Ops, + o.Mbps, + o.Iops, + o.MinLat, + o.AvgLat, + o.NinetyNineLat, + o.MaxLat, + o.Slowdowns) } func (o *OutputStats) csv_header(w *csv.Writer) { @@ -242,7 +242,7 @@ func (o *OutputStats) csv(w *csv.Writer) { log.Fatal("OutputStats Passed nil csv writer") } - s := []string { + s := []string{ strconv.Itoa(o.Loop), o.IntervalName, strconv.FormatFloat(o.Seconds, 'f', 2, 64), @@ -257,7 +257,7 @@ func (o *OutputStats) csv(w *csv.Writer) { strconv.FormatInt(o.Slowdowns, 10)} if err := w.Write(s); err != nil { - log.Fatal("Error writing to CSV writer: ",err) + log.Fatal("Error writing to CSV writer: ", err) } } @@ -273,13 +273,13 @@ func (o *OutputStats) json(jfile *os.File) { _, err = jfile.WriteString(string(jdata) + "\n") if err != nil { log.Fatal("Error writing to JSON file: ", err) - } + } } type ThreadStats struct { - start int64 + start int64 curInterval int64 - intervals []IntervalStats + intervals []IntervalStats } func makeThreadStats(s int64, loop int, mode string, intervalNano int64) ThreadStats { @@ -293,8 +293,8 @@ func (ts *ThreadStats) updateIntervals(loop int, mode string, intervalNano int64 if intervalNano < 0 { return ts.curInterval } - for ts.start + intervalNano*(ts.curInterval+1) < time.Now().UnixNano() { - ts.curInterval++ + for ts.start+intervalNano*(ts.curInterval+1) < time.Now().UnixNano() { + ts.curInterval++ ts.intervals = append( ts.intervals, IntervalStats{ @@ -318,10 +318,10 @@ type Stats struct { threads int // The loop we are in loop int - // Test mode being run - mode string + // Test mode being run + mode string // start time in nanoseconds - startNano int64 + startNano int64 // end time in nanoseconds endNano int64 // Duration in nanoseconds for each interval @@ -329,7 +329,7 @@ type Stats struct { // Per-thread statistics threadStats []ThreadStats // a map of per-interval thread completion counters - intervalCompletions sync.Map + intervalCompletions sync.Map // a counter of how many threads have finished updating stats entirely completions int32 } @@ -345,73 +345,73 @@ func makeStats(loop int, mode string, threads int, intervalNano int64) Stats { } func (stats *Stats) makeOutputStats(i int64) (OutputStats, bool) { - // Check bounds first - if stats.intervalNano < 0 || i < 0 { - return OutputStats{}, false - } - // Not safe to log if not all writers have completed. - value, ok := stats.intervalCompletions.Load(i) - if !ok { - return OutputStats{}, false - } - cp, ok := value.(*int32) - if !ok { - return OutputStats{}, false - } - count := atomic.LoadInt32(cp) - if count < int32(stats.threads) { - return OutputStats{}, false - } + // Check bounds first + if stats.intervalNano < 0 || i < 0 { + return OutputStats{}, false + } + // Not safe to log if not all writers have completed. + value, ok := stats.intervalCompletions.Load(i) + if !ok { + return OutputStats{}, false + } + cp, ok := value.(*int32) + if !ok { + return OutputStats{}, false + } + count := atomic.LoadInt32(cp) + if count < int32(stats.threads) { + return OutputStats{}, false + } - bytes := int64(0) - ops := int64(0) - slowdowns := int64(0) + bytes := int64(0) + ops := int64(0) + slowdowns := int64(0) - for t := 0; t < stats.threads; t++ { - bytes += stats.threadStats[t].intervals[i].bytes - ops += int64(len(stats.threadStats[t].intervals[i].latNano)) - slowdowns += stats.threadStats[t].intervals[i].slowdowns - } - // Aggregate the per-thread Latency slice - tmpLat := make([]int64, ops) - var c int - for t := 0; t < stats.threads; t++ { - c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) - } - sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) - is := IntervalStats{stats.loop, strconv.FormatInt(i, 10), stats.mode, bytes, slowdowns, stats.intervalNano, tmpLat} + for t := 0; t < stats.threads; t++ { + bytes += stats.threadStats[t].intervals[i].bytes + ops += int64(len(stats.threadStats[t].intervals[i].latNano)) + slowdowns += stats.threadStats[t].intervals[i].slowdowns + } + // Aggregate the per-thread Latency slice + tmpLat := make([]int64, ops) + var c int + for t := 0; t < stats.threads; t++ { + c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) + } + sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) + is := IntervalStats{stats.loop, strconv.FormatInt(i, 10), stats.mode, bytes, slowdowns, stats.intervalNano, tmpLat} return is.makeOutputStats(), true } func (stats *Stats) makeTotalStats() (OutputStats, bool) { - // Not safe to log if not all writers have completed. - completions := atomic.LoadInt32(&stats.completions) - if (completions < int32(threads)) { - log.Printf("log, completions: %d", completions) - return OutputStats{}, false - } + // Not safe to log if not all writers have completed. + completions := atomic.LoadInt32(&stats.completions) + if completions < int32(threads) { + log.Printf("log, completions: %d", completions) + return OutputStats{}, false + } - bytes := int64(0) - ops := int64(0) - slowdowns := int64(0); + bytes := int64(0) + ops := int64(0) + slowdowns := int64(0) for t := 0; t < stats.threads; t++ { for i := 0; i < len(stats.threadStats[t].intervals); i++ { - bytes += stats.threadStats[t].intervals[i].bytes - ops += int64(len(stats.threadStats[t].intervals[i].latNano)) - slowdowns += stats.threadStats[t].intervals[i].slowdowns - } + bytes += stats.threadStats[t].intervals[i].bytes + ops += int64(len(stats.threadStats[t].intervals[i].latNano)) + slowdowns += stats.threadStats[t].intervals[i].slowdowns + } } - // Aggregate the per-thread Latency slice - tmpLat := make([]int64, ops) - var c int - for t := 0; t < stats.threads; t++ { - for i := 0; i < len(stats.threadStats[t].intervals); i++ { - c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) - } + // Aggregate the per-thread Latency slice + tmpLat := make([]int64, ops) + var c int + for t := 0; t < stats.threads; t++ { + for i := 0; i < len(stats.threadStats[t].intervals); i++ { + c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) + } } - sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) - is := IntervalStats{stats.loop, "TOTAL", stats.mode, bytes, slowdowns, stats.endNano - stats.startNano, tmpLat} + sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) + is := IntervalStats{stats.loop, "TOTAL", stats.mode, bytes, slowdowns, stats.endNano - stats.startNano, tmpLat} return is.makeOutputStats(), true } @@ -477,15 +477,15 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { if duration_secs > -1 && time.Now().After(endtime) { break } - objnum := atomic.AddInt64(&op_counter, 1) - bucket_num := objnum % int64(bucket_count) + objnum := atomic.AddInt64(&op_counter, 1) + bucket_num := objnum % int64(bucket_count) if object_count > -1 && objnum >= object_count { objnum = atomic.AddInt64(&op_counter, -1) break } fileobj := bytes.NewReader(object_data) - key := fmt.Sprintf("%s%012d", object_prefix, objnum) + key := fmt.Sprintf("%s%012d", object_prefix, objnum) r := &s3.PutObjectInput{ Bucket: &buckets[bucket_num], Key: &key, @@ -496,12 +496,12 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { // Disable payload checksum calculation (very expensive) req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") err := req.Send() - end := time.Now().UnixNano() - stats.updateIntervals(thread_num) + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) if err != nil { errcnt++ - stats.addSlowDown(thread_num); + stats.addSlowDown(thread_num) atomic.AddInt64(&op_counter, -1) log.Printf("upload err", err) } else { @@ -512,7 +512,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { break } } - stats.finish(thread_num) + stats.finish(thread_num) atomic.AddInt64(&running_threads, -1) } @@ -521,46 +521,46 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) { svc := s3.New(session.New(), cfg) for { if duration_secs > -1 && time.Now().After(endtime) { - break - } + break + } objnum := atomic.AddInt64(&op_counter, 1) - if object_count > -1 && objnum >= object_count { + if object_count > -1 && objnum >= object_count { atomic.AddInt64(&op_counter, -1) - break - } + break + } bucket_num := objnum % int64(bucket_count) - key := fmt.Sprintf("%s%012d", object_prefix, objnum) - r := &s3.GetObjectInput{ - Bucket: &buckets[bucket_num], - Key: &key, - } + key := fmt.Sprintf("%s%012d", object_prefix, objnum) + r := &s3.GetObjectInput{ + Bucket: &buckets[bucket_num], + Key: &key, + } start := time.Now().UnixNano() - req, resp := svc.GetObjectRequest(r) - err := req.Send() + req, resp := svc.GetObjectRequest(r) + err := req.Send() end := time.Now().UnixNano() stats.updateIntervals(thread_num) - if err != nil { - errcnt++ - stats.addSlowDown(thread_num); - log.Printf("download err", err) + if err != nil { + errcnt++ + stats.addSlowDown(thread_num) + log.Printf("download err", err) } else { // Update the stats stats.addOp(thread_num, object_size, end-start) } - if err == nil { - _, err = io.Copy(ioutil.Discard, resp.Body) - } - if errcnt > 2 { - break - } + if err == nil { + _, err = io.Copy(ioutil.Discard, resp.Body) + } + if errcnt > 2 { + break + } } - stats.finish(thread_num) + stats.finish(thread_num) atomic.AddInt64(&running_threads, -1) } @@ -569,43 +569,43 @@ func runDelete(thread_num int, stats *Stats) { svc := s3.New(session.New(), cfg) for { - if duration_secs > -1 && time.Now().After(endtime) { - break - } - - objnum := atomic.AddInt64(&op_counter, 1) - if object_count > -1 && objnum >= object_count { - atomic.AddInt64(&op_counter, -1) + if duration_secs > -1 && time.Now().After(endtime) { break } - bucket_num := objnum % int64(bucket_count) + objnum := atomic.AddInt64(&op_counter, 1) + if object_count > -1 && objnum >= object_count { + atomic.AddInt64(&op_counter, -1) + break + } - key := fmt.Sprintf("%s%012d", object_prefix, objnum) - r := &s3.DeleteObjectInput{ - Bucket: &buckets[bucket_num], - Key: &key, - } + bucket_num := objnum % int64(bucket_count) + + key := fmt.Sprintf("%s%012d", object_prefix, objnum) + r := &s3.DeleteObjectInput{ + Bucket: &buckets[bucket_num], + Key: &key, + } start := time.Now().UnixNano() - req, out := svc.DeleteObjectRequest(r) - err := req.Send() + req, out := svc.DeleteObjectRequest(r) + err := req.Send() end := time.Now().UnixNano() stats.updateIntervals(thread_num) - if err != nil { - errcnt++ - stats.addSlowDown(thread_num); - log.Printf("delete err", err, "out", out.String()) - } else { + if err != nil { + errcnt++ + stats.addSlowDown(thread_num) + log.Printf("delete err", err, "out", out.String()) + } else { // Update the stats stats.addOp(thread_num, object_size, end-start) } - if errcnt > 2 { + if errcnt > 2 { break - } + } } - stats.finish(thread_num) + stats.finish(thread_num) atomic.AddInt64(&running_threads, -1) } @@ -615,9 +615,9 @@ func runBucketDelete(thread_num int, stats *Stats) { for { bucket_num := atomic.AddInt64(&op_counter, 1) if bucket_num >= bucket_count { - atomic.AddInt64(&op_counter, -1) - break - } + atomic.AddInt64(&op_counter, -1) + break + } r := &s3.DeleteBucketInput{ Bucket: &buckets[bucket_num], } @@ -627,156 +627,156 @@ func runBucketDelete(thread_num int, stats *Stats) { end := time.Now().UnixNano() stats.updateIntervals(thread_num) - if err != nil { + if err != nil { break - } - stats.addOp(thread_num, 0, end-start) - } - stats.finish(thread_num) - atomic.AddInt64(&running_threads, -1) + } + stats.addOp(thread_num, 0, end-start) + } + stats.finish(thread_num) + atomic.AddInt64(&running_threads, -1) } var cfg *aws.Config func runBucketsInit(thread_num int, stats *Stats) { - svc := s3.New(session.New(), cfg) + svc := s3.New(session.New(), cfg) for { - bucket_num := atomic.AddInt64(&op_counter, 1) + bucket_num := atomic.AddInt64(&op_counter, 1) if bucket_num >= bucket_count { atomic.AddInt64(&op_counter, -1) break } - start := time.Now().UnixNano() - in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} - _, err := svc.CreateBucket(in) - end := time.Now().UnixNano() - stats.updateIntervals(thread_num) + start := time.Now().UnixNano() + in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} + _, err := svc.CreateBucket(in) + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) if err != nil { - if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) && - !strings.Contains(err.Error(), "BucketAlreadyExists") { - log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err) - } + if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) && + !strings.Contains(err.Error(), "BucketAlreadyExists") { + log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err) + } } - stats.addOp(thread_num, 0, end-start) + stats.addOp(thread_num, 0, end-start) } - stats.finish(thread_num) - atomic.AddInt64(&running_threads, -1) + stats.finish(thread_num) + atomic.AddInt64(&running_threads, -1) } func runBucketsClear(thread_num int, stats *Stats) { - svc := s3.New(session.New(), cfg) + svc := s3.New(session.New(), cfg) - for { - bucket_num := atomic.AddInt64(&op_counter, 1) - if bucket_num >= bucket_count { - atomic.AddInt64(&op_counter, -1) - break - } - out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) - if err != nil { + for { + bucket_num := atomic.AddInt64(&op_counter, 1) + if bucket_num >= bucket_count { + atomic.AddInt64(&op_counter, -1) break - } - n := len(out.Contents) - for n > 0 { - for _, v := range out.Contents { + } + out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) + if err != nil { + break + } + n := len(out.Contents) + for n > 0 { + for _, v := range out.Contents { start := time.Now().UnixNano() - svc.DeleteObject(&s3.DeleteObjectInput{ - Bucket: &buckets[bucket_num], - Key: v.Key, - }) + svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: &buckets[bucket_num], + Key: v.Key, + }) end := time.Now().UnixNano() - stats.updateIntervals(thread_num) - stats.addOp(thread_num, *v.Size, end-start) + stats.updateIntervals(thread_num) + stats.addOp(thread_num, *v.Size, end-start) - } - out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) - if err != nil { + } + out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) + if err != nil { break - } - n = len(out.Contents) - } - } - stats.finish(thread_num) - atomic.AddInt64(&running_threads, -1) + } + n = len(out.Contents) + } + } + stats.finish(thread_num) + atomic.AddInt64(&running_threads, -1) } func runWrapper(loop int, r rune) []OutputStats { - op_counter = -1 - running_threads = int64(threads) - intervalNano := int64(interval*1000000000) - endtime = time.Now().Add(time.Second * time.Duration(duration_secs)) + op_counter = -1 + running_threads = int64(threads) + intervalNano := int64(interval * 1000000000) + endtime = time.Now().Add(time.Second * time.Duration(duration_secs)) var stats Stats - // If we perviously set the object count after running a put - // test, set the object count back to -1 for the new put test. + // If we perviously set the object count after running a put + // test, set the object count back to -1 for the new put test. if r == 'p' && object_count_flag { - object_count = -1 - object_count_flag = false - } + object_count = -1 + object_count_flag = false + } switch r { - case 'c': - log.Printf("Running Loop %d BUCKET CLEAR TEST", loop) - stats = makeStats(loop, "BCLR", threads, intervalNano) - for n := 0; n < threads; n++ { - go runBucketsClear(n, &stats); - } - case 'x': - log.Printf("Running Loop %d BUCKET DELETE TEST", loop) - stats = makeStats(loop, "BDEL", threads, intervalNano) - for n := 0; n < threads; n++ { - go runBucketDelete(n, &stats); - } + case 'c': + log.Printf("Running Loop %d BUCKET CLEAR TEST", loop) + stats = makeStats(loop, "BCLR", threads, intervalNano) + for n := 0; n < threads; n++ { + go runBucketsClear(n, &stats) + } + case 'x': + log.Printf("Running Loop %d BUCKET DELETE TEST", loop) + stats = makeStats(loop, "BDEL", threads, intervalNano) + for n := 0; n < threads; n++ { + go runBucketDelete(n, &stats) + } case 'i': log.Printf("Running Loop %d BUCKET INIT TEST", loop) stats = makeStats(loop, "BINIT", threads, intervalNano) for n := 0; n < threads; n++ { - go runBucketsInit(n, &stats); + go runBucketsInit(n, &stats) } case 'p': log.Printf("Running Loop %d OBJECT PUT TEST", loop) - stats = makeStats(loop, "PUT", threads, intervalNano) + stats = makeStats(loop, "PUT", threads, intervalNano) for n := 0; n < threads; n++ { - go runUpload(n, endtime, &stats); + go runUpload(n, endtime, &stats) } - case 'g': - log.Printf("Running Loop %d OBJECT GET TEST", loop) - stats = makeStats(loop, "GET", threads, intervalNano) - for n := 0; n < threads; n++ { - go runDownload(n, endtime, &stats); + case 'g': + log.Printf("Running Loop %d OBJECT GET TEST", loop) + stats = makeStats(loop, "GET", threads, intervalNano) + for n := 0; n < threads; n++ { + go runDownload(n, endtime, &stats) } - case 'd': - log.Printf("Running Loop %d OBJECT DELETE TEST", loop) - stats = makeStats(loop, "DEL", threads, intervalNano) - for n := 0; n < threads; n++ { - go runDelete(n, &stats); - } - } + case 'd': + log.Printf("Running Loop %d OBJECT DELETE TEST", loop) + stats = makeStats(loop, "DEL", threads, intervalNano) + for n := 0; n < threads; n++ { + go runDelete(n, &stats) + } + } - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } + // Wait for it to finish + for atomic.LoadInt64(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } - // If the user didn't set the object_count, we can set it here - // to limit subsequent get/del tests to valid objects only. + // If the user didn't set the object_count, we can set it here + // to limit subsequent get/del tests to valid objects only. if r == 'p' && object_count < 0 { object_count = op_counter + 1 - object_count_flag = true - } + object_count_flag = true + } // Create the Output Stats os := make([]OutputStats, 0) for i := int64(0); i >= 0; i++ { if o, ok := stats.makeOutputStats(i); ok { - os = append(os, o) + os = append(os, o) } else { - break + break } } - if o, ok := stats.makeTotalStats(); ok { + if o, ok := stats.makeTotalStats(); ok { o.log() os = append(os, o) } @@ -789,22 +789,22 @@ func init() { myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key") myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key") myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix") - myflag.StringVar(&object_prefix, "op", "", "Prefix for objects") + myflag.StringVar(&object_prefix, "op", "", "Prefix for objects") myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") - myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info") + myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info") myflag.StringVar(&output, "o", "", "Write CSV output to this file") myflag.StringVar(&json_output, "j", "", "Write JSON output to this file") - myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") - myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") + myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") + myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") myflag.Float64Var(&interval, "ri", 1.0, "Number of seconds between report intervals") // define custom usage output with notes - notes := -` + notes := + ` NOTES: - Valid mode types for the -m mode string are: c: clear all existing objects from buckets (requires lookups) @@ -823,7 +823,7 @@ NOTES: fmt.Fprintf(flag.CommandLine.Output(), "\nUSAGE: %s [OPTIONS]\n\n", os.Args[0]) fmt.Fprintf(flag.CommandLine.Output(), "OPTIONS:\n") myflag.PrintDefaults() - fmt.Fprintf(flag.CommandLine.Output(), notes); + fmt.Fprintf(flag.CommandLine.Output(), notes) } if err := myflag.Parse(os.Args[1:]); err != nil { @@ -845,13 +845,12 @@ NOTES: } invalid_mode := false for _, r := range modes { - if ( - r != 'i' && - r != 'c' && - r != 'p' && - r != 'g' && - r != 'd' && - r != 'x') { + if r != 'i' && + r != 'c' && + r != 'p' && + r != 'g' && + r != 'd' && + r != 'x' { s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r)) log.Printf(s) invalid_mode = true @@ -859,7 +858,7 @@ NOTES: } if invalid_mode { log.Fatal("Invalid modes passed to -m, see help for details.") - } + } var err error var size uint64 if size, err = bytefmt.ToBytes(sizeArg); err != nil { @@ -869,12 +868,12 @@ NOTES: } func initData() { - // Initialize data for the bucket - object_data = make([]byte, object_size) - rand.Read(object_data) - hasher := md5.New() - hasher.Write(object_data) - object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) + // Initialize data for the bucket + object_data = make([]byte, object_size) + rand.Read(object_data) + hasher := md5.New() + hasher.Write(object_data) + object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) } func main() { @@ -911,51 +910,51 @@ func main() { initData() // Setup the slice of buckets - for i := int64(0); i < bucket_count; i++ { + for i := int64(0); i < bucket_count; i++ { buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i)) } - // Loop running the tests - oStats := make([]OutputStats, 0) - for loop := 0; loop < loops; loop++ { - for _, r := range modes { - oStats = append(oStats, runWrapper(loop, r)...) - } - } + // Loop running the tests + oStats := make([]OutputStats, 0) + for loop := 0; loop < loops; loop++ { + for _, r := range modes { + oStats = append(oStats, runWrapper(loop, r)...) + } + } // Write CSV Output if output != "" { - file, err := os.OpenFile(output, os.O_CREATE|os.O_WRONLY, 0777) + file, err := os.OpenFile(output, os.O_CREATE|os.O_WRONLY, 0777) defer file.Close() if err != nil { log.Fatal("Could not open CSV file for writing.") } else { csvWriter := csv.NewWriter(file) - for i, o := range oStats { + for i, o := range oStats { if i == 0 { o.csv_header(csvWriter) } - o.csv(csvWriter) - } - csvWriter.Flush() + o.csv(csvWriter) + } + csvWriter.Flush() } } // Write JSON output if json_output != "" { file, err := os.OpenFile(json_output, os.O_CREATE|os.O_WRONLY, 0777) - defer file.Close() - if err != nil { - log.Fatal("Could not open JSON file for writing.") - } + defer file.Close() + if err != nil { + log.Fatal("Could not open JSON file for writing.") + } data, err := json.Marshal(oStats) - if err != nil { - log.Fatal("Error marshaling JSON: ", err) - } - _, err = file.Write(data) - if err != nil { - log.Fatal("Error writing to JSON file: ", err) - } - file.Sync() + if err != nil { + log.Fatal("Error marshaling JSON: ", err) + } + _, err = file.Write(data) + if err != nil { + log.Fatal("Error writing to JSON file: ", err) + } + file.Sync() } }