From a9f8236b72832974a45a8b440262a14625f0b9cb Mon Sep 17 00:00:00 2001 From: Mark Nelson Date: Tue, 13 Aug 2019 13:22:07 -0400 Subject: [PATCH] s3-benchmark: Add interval stats Signed-off-by: Mark Nelson --- s3-benchmark.go | 471 ++++++++++++++++++++++++++++++++++-------------- 1 file changed, 340 insertions(+), 131 deletions(-) diff --git a/s3-benchmark.go b/s3-benchmark.go index fe9ccfa..629a982 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -15,15 +15,17 @@ import ( "io" "io/ioutil" "log" + "math" "math/rand" "net" "net/http" "os" "sort" + "strconv" "strings" + "sync" "sync/atomic" "time" - "code.cloudfoundry.org/bytefmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" @@ -37,9 +39,9 @@ var buckets []string var duration_secs, threads, loops, bucket_count int var object_data []byte var object_data_md5 string -var object_size uint64 -var running_threads, object_count, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int64 +var running_threads, object_count, object_size, op_count int64 var endtime, upload_finish, download_finish, delete_finish time.Time +var interval float64 func logit(msg string) { fmt.Println(msg) @@ -96,7 +98,6 @@ func getS3Client() *s3.S3 { func createBucket(bucket_num int, ignore_errors bool) { svc := s3.New(session.New(), cfg) - log.Printf(buckets[bucket_num]) in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} if _, err := svc.CreateBucket(in); err != nil { if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) || @@ -180,18 +181,248 @@ func setSignature(req *http.Request) { req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature)) } -func runUpload(thread_num int) { -// bucket_num := thread_num % bucket_count +type IntervalStats struct { + bytes int64 + slowdowns int64 + latNano []int64 +} + +type ThreadStats struct { + start int64 + curInterval int64 + intervals []IntervalStats +} + +func makeThreadStats(s int64, intervalNano int64) ThreadStats { + ts := ThreadStats{start: s, curInterval: -1} + ts.updateIntervals(intervalNano) + return ts +} + +func (ts *ThreadStats) updateIntervals(intervalNano int64) int64 { + if intervalNano < 0 { + return ts.curInterval + } + for ts.start + intervalNano*ts.curInterval < time.Now().UnixNano() { + ts.intervals = append(ts.intervals, IntervalStats{0, 0, []int64{}}) + ts.curInterval++ + } + return ts.curInterval +} + +func (ts *ThreadStats) finish() { + ts.curInterval = -1 +} + +type Stats struct { + // threads + threads int + // The loop we are in + loop int + // Test mode being run + mode string + // start time in nanoseconds + startNano int64 + // end time in nanoseconds + endNano int64 + // Duration in nanoseconds for each interval + intervalNano int64 + // Per-thread statistics + threadStats []ThreadStats + // a map of counters of how many threads have finished given interval of stats + intervalCompletions sync.Map + // a counter of how many threads have finished updating stats entirely + completions int32 +} + +func makeStats(loop int, mode string, threads int, intervalNano int64) Stats { + start := time.Now().UnixNano() + s := Stats{threads, loop, mode, start, 0, intervalNano, []ThreadStats{}, sync.Map{}, 0} + for i := 0; i < threads; i++ { + s.threadStats = append(s.threadStats, makeThreadStats(start, s.intervalNano)) + } + return s +} + +func (stats *Stats) _getIntervalStats(i int64) IntervalStats { + bytes := int64(0) + ops := int64(0) + slowdowns := int64(0); + + for t := 0; t < stats.threads; t++ { + bytes += stats.threadStats[t].intervals[i].bytes + ops += int64(len(stats.threadStats[t].intervals[i].latNano)) + slowdowns += stats.threadStats[t].intervals[i].slowdowns + } + // Aggregate the per-thread Latency slice + tmpLat := make([]int64, ops) + var c int + for t := 0; t < stats.threads; t++ { + c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) + } + sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) + return IntervalStats{bytes, slowdowns, tmpLat} +} + +func (stats *Stats) _getTotalStats() IntervalStats { + bytes := int64(0) + ops := int64(0) + slowdowns := int64(0); + + for t := 0; t < stats.threads; t++ { + for i := 0; i < len(stats.threadStats[t].intervals); i++ { + bytes += stats.threadStats[t].intervals[i].bytes + ops += int64(len(stats.threadStats[t].intervals[i].latNano)) + slowdowns += stats.threadStats[t].intervals[i].slowdowns + } + } + // Aggregate the per-thread Latency slice + tmpLat := make([]int64, ops) + var c int + for t := 0; t < stats.threads; t++ { + for i := 0; i < len(stats.threadStats[t].intervals); i++ { + c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano) + } + } + sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] }) + return IntervalStats{bytes, slowdowns, tmpLat} +} + +func (stats *Stats) logI(i int64) bool { + // Check bounds first + if stats.intervalNano < 0 || i < 0 { + return false; + } + // Not safe to log if not all writers have completed. + value, ok := stats.intervalCompletions.Load(i) + if !ok { + return false; + } + cp, ok := value.(*int32) + if !ok { + return false; + } + count := atomic.LoadInt32(cp) + if count < int32(stats.threads) { + return false; + } + + return stats._log(strconv.FormatInt(i, 10), stats.intervalNano, stats._getIntervalStats(i)) +} + +func (stats *Stats) log() bool { + // Not safe to log if not all writers have completed. + completions := atomic.LoadInt32(&stats.completions) + if (completions < int32(stats.threads)) { + log.Printf("log, completions: %d", completions) + return false; + } + return stats._log("ALL", stats.endNano - stats.startNano, stats._getTotalStats()) +} + +func (stats *Stats) _log(intervalName string, intervalNano int64, intervalStats IntervalStats) bool { + // Compute and log the stats + ops := len(intervalStats.latNano) + totalLat := int64(0); + minLat := float64(0); + maxLat := float64(0); + NinetyNineLat := float64(0); + avgLat := float64(0); + if ops > 0 { + minLat = float64(intervalStats.latNano[0]) / 1000000 + maxLat = float64(intervalStats.latNano[ops - 1]) / 1000000 + for i := range intervalStats.latNano { + totalLat += intervalStats.latNano[i] + } + avgLat = float64(totalLat) / float64(ops) / 1000000 + NintyNineLatNano := intervalStats.latNano[int64(math.Round(0.99*float64(ops))) - 1] + NinetyNineLat = float64(NintyNineLatNano) / 1000000 + } + seconds := float64(intervalNano) / 1000000000 + mbps := float64(intervalStats.bytes) / seconds / bytefmt.MEGABYTE + iops := float64(ops) / seconds + + log.Printf( + "Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d", + stats.loop, + intervalName, + seconds, + stats.mode, + ops, + mbps, + iops, + minLat, + avgLat, + NinetyNineLat, + maxLat, + intervalStats.slowdowns) + return true +} + +// Only safe to call from the calling thread +func (stats *Stats) updateIntervals(thread_num int) int64 { + curInterval := stats.threadStats[thread_num].curInterval + newInterval := stats.threadStats[thread_num].updateIntervals(stats.intervalNano) + + // Finish has already been called + if curInterval < 0 { + return -1 + } + + for i := curInterval; i < newInterval; i++ { + // load or store the current value + value, _ := stats.intervalCompletions.LoadOrStore(i, new(int32)) + cp, ok := value.(*int32) + if !ok { + log.Printf("updateIntervals: got data of type %T but wanted *int32", value) + continue + } + + count := atomic.AddInt32(cp, 1) + if count == int32(stats.threads) { + stats.logI(i) + } + } + return newInterval +} + +func (stats *Stats) addOp(thread_num int, bytes int64, latNano int64) { + + // Interval statistics + cur := stats.threadStats[thread_num].curInterval + if cur < 0 { + return + } + stats.threadStats[thread_num].intervals[cur].bytes += bytes + stats.threadStats[thread_num].intervals[cur].latNano = + append(stats.threadStats[thread_num].intervals[cur].latNano, latNano) +} + +func (stats *Stats) addSlowDown(thread_num int) { + cur := stats.threadStats[thread_num].curInterval + stats.threadStats[thread_num].intervals[cur].slowdowns++ +} + +func (stats *Stats) finish(thread_num int) { + stats.threadStats[thread_num].updateIntervals(stats.intervalNano) + stats.threadStats[thread_num].finish() + count := atomic.AddInt32(&stats.completions, 1) + if count == int32(stats.threads) { + stats.endNano = time.Now().UnixNano() + } +} + +func runUpload(thread_num int, fendtime time.Time, stats *Stats) { errcnt := 0 svc := s3.New(session.New(), cfg) for { if duration_secs > -1 && time.Now().After(endtime) { break } - objnum := atomic.AddInt64(&upload_count, 1) + objnum := atomic.AddInt64(&op_count, 1) bucket_num := objnum % int64(bucket_count) if object_count > -1 && objnum > object_count { - objnum = atomic.AddInt64(&upload_count, -1) + objnum = atomic.AddInt64(&op_count, -1) break } fileobj := bytes.NewReader(object_data) @@ -202,30 +433,36 @@ func runUpload(thread_num int) { Key: &key, Body: fileobj, } - + start := time.Now().UnixNano() req, _ := svc.PutObjectRequest(r) // Disable payload checksum calculation (very expensive) req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") err := req.Send() + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) + if err != nil { errcnt++ - atomic.AddInt64(&upload_slowdown_count, 1) - atomic.AddInt64(&upload_count, -1) + stats.addSlowDown(thread_num); + atomic.AddInt64(&op_count, -1) fmt.Println("upload err", err) - //break + } else { + // Update the stats + stats.addOp(thread_num, object_size, end-start) } if errcnt > 2 { break } - fmt.Fprintf(os.Stderr, "upload thread %5v, %v\r", thread_num, key) } // Remember last done time upload_finish = time.Now() // One less thread atomic.AddInt64(&running_threads, -1) + // stats are done + stats.finish(thread_num) } -func runDownload(thread_num int) { +func runDownload(thread_num int, fendtime time.Time, stats *Stats) { errcnt := 0 svc := s3.New(session.New(), cfg) for { @@ -233,9 +470,9 @@ func runDownload(thread_num int) { break } - objnum := atomic.AddInt64(&download_count, 1) + objnum := atomic.AddInt64(&op_count, 1) if objnum > object_count { - atomic.AddInt64(&download_count, -1) + atomic.AddInt64(&op_count, -1) break } @@ -246,37 +483,45 @@ func runDownload(thread_num int) { Key: &key, } + start := time.Now().UnixNano() req, resp := svc.GetObjectRequest(r) err := req.Send() + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) + if err != nil { errcnt++ - atomic.AddInt64(&download_slowdown_count, 1) - atomic.AddInt64(&download_count, -1) + stats.addSlowDown(thread_num); fmt.Println("download err", err) - //break - } + } else { + // Update the stats + stats.addOp(thread_num, object_size, end-start) + } + if err == nil { _, err = io.Copy(ioutil.Discard, resp.Body) } if errcnt > 2 { break } - fmt.Fprintf(os.Stderr, "download thread %5v, %v\r", thread_num, key) + } // Remember last done time download_finish = time.Now() // One less thread atomic.AddInt64(&running_threads, -1) + // stats are done + stats.finish(thread_num) } -func runDelete(thread_num int) { +func runDelete(thread_num int, stats *Stats) { errcnt := 0 svc := s3.New(session.New(), cfg) for { - objnum := atomic.AddInt64(&delete_count, 1) + objnum := atomic.AddInt64(&op_count, 1) if objnum > object_count { - atomic.AddInt64(&delete_count, -1) + atomic.AddInt64(&op_count, -1) break } @@ -288,119 +533,84 @@ func runDelete(thread_num int) { Key: &key, } + start := time.Now().UnixNano() req, out := svc.DeleteObjectRequest(r) err := req.Send() + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) + if err != nil { errcnt++ - atomic.AddInt64(&delete_slowdown_count, 1) - atomic.AddInt64(&delete_count, -1) + stats.addSlowDown(thread_num); fmt.Println("delete err", err, "out", out.String()) - } + } else { + // Update the stats + stats.addOp(thread_num, object_size, end-start) + } if errcnt > 2 { break } - fmt.Fprintf(os.Stderr, "delete thread %5v, %v\r", thread_num, key) } // Remember last done time delete_finish = time.Now() // One less thread atomic.AddInt64(&running_threads, -1) + // stats are done + stats.finish(thread_num) } var cfg *aws.Config -func init_buckets(loop int) { - // Initialize data for the bucket - object_data = make([]byte, object_size) - rand.Read(object_data) - hasher := md5.New() - hasher.Write(object_data) - object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) - +func initBuckets(loop int, stats *Stats) { // Create the buckets and delete all the objects - starttime := time.Now() for i := 0; i < bucket_count; i++ { + start := time.Now().UnixNano() createBucket(i, true) deleteAllObjects(i) + end := time.Now().UnixNano() + stats.updateIntervals(0) + stats.addOp(0, 0, end-start) } - init_time := time.Now().Sub(starttime).Seconds() - - logit(fmt.Sprintf("Loop %d: INIT time %.1f secs, buckets = %d, speed = %.1f buckets/sec.", - loop, init_time, bucket_count, float64(bucket_count)/init_time)) + stats.finish(0) } -func run_put(loop int) float64 { - // reset counters - upload_count = 0 - upload_slowdown_count = 0 +func runWrapper(loop int, r rune) { + op_count = 0 + running_threads = int64(threads) + intervalNano := int64(interval*1000000000) + endtime = time.Now().Add(time.Second * time.Duration(duration_secs)) + var stats Stats - running_threads = int64(threads) - starttime := time.Now() - endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 0; n < threads; n++ { - go runUpload(n) + switch r { + case 'i': + log.Printf("Running Loop %d Init", loop) + stats = makeStats(loop, "INIT", 1, intervalNano) + initBuckets(loop, &stats); + running_threads = 0; + case 'p': + log.Printf("Running Loop %d Put Test", loop) + stats = makeStats(loop, "PUT", threads, intervalNano) + for n := 0; n < threads; n++ { + go runUpload(n, endtime, &stats); + } + case 'g': + log.Printf("Running Loop %d Get Test", loop) + stats = makeStats(loop, "GET", threads, intervalNano) + for n := 0; n < threads; n++ { + go runDownload(n, endtime, &stats); + } + case 'd': + log.Printf("Running Loop %d Del Test", loop) + stats = makeStats(loop, "DEL", threads, intervalNano) + for n := 0; n < threads; n++ { + go runDelete(n, &stats); + } } - - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } - upload_time := upload_finish.Sub(starttime).Seconds() - - bps := float64(uint64(upload_count)*object_size) / upload_time - logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", - loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count)) - - return bps / bytefmt.MEGABYTE -} - -func run_get(loop int) float64 { - // reset counters - download_count = 0 - download_slowdown_count = 0 - - running_threads = int64(threads) - starttime := time.Now() - endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 0; n < threads; n++ { - go runDownload(n) - } - - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } - download_time := download_finish.Sub(starttime).Seconds() - - bps := float64(uint64(download_count)*object_size) / download_time - logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", - loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count)) - - return bps / bytefmt.MEGABYTE -} - -func run_delete(loop int) float64 { - // reset counters - delete_count = 0 - delete_slowdown_count = 0 - - running_threads = int64(threads) - starttime := time.Now() - for n := 0; n < threads; n++ { - go runDelete(n) - } - - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } - delete_time := delete_finish.Sub(starttime).Seconds() - - bps := float64(uint64(delete_count)*object_size) / delete_time - logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d", - loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count)) - - return bps / bytefmt.MEGABYTE + // Wait for it to finish + for atomic.LoadInt64(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + stats.log() } func init() { @@ -419,7 +629,7 @@ func init() { myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") - + myflag.Float64Var(&interval, "ri", 1.0, "Number of seconds between report intervals") // define custom usage output with notes notes := ` @@ -471,9 +681,20 @@ NOTES: log.Fatal("Invalid modes passed to -m, see help for details.") } var err error - if object_size, err = bytefmt.ToBytes(sizeArg); err != nil { + var size uint64 + if size, err = bytefmt.ToBytes(sizeArg); err != nil { log.Fatalf("Invalid -z argument for object size: %v", err) } + object_size = int64(size) +} + +func initData() { + // Initialize data for the bucket + object_data = make([]byte, object_size) + rand.Read(object_data) + hasher := md5.New() + hasher.Write(object_data) + object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) } func main() { @@ -494,30 +715,18 @@ func main() { logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s", url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg)) + // Init Data + initData() + // Setup the slice of buckets for i := 0; i < bucket_count; i++ { buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i)) } - var uploadspeed, downloadspeed float64 // Loop running the tests - for loop := 1; loop <= loops; loop++ { - for _, r := range modes { - switch r { - case 'i': - init_buckets(loop); - case 'p': - uploadspeed = run_put(loop); - case 'g': - downloadspeed = run_get(loop); - case 'd': - run_delete(loop); - } + for loop := 0; loop < loops; loop++ { + for _, r := range modes { + runWrapper(loop, r) } } - - // All done - name := strings.Split(strings.TrimPrefix(url_host, "http://"), ".")[0] - fmt.Printf("result title: name-concurrency-size, uloadspeed, downloadspeed\n") - fmt.Printf("result csv: %v-%v-%v,%.2f,%.2f\n", name, threads, sizeArg, uploadspeed, downloadspeed) }