From 60225d82d7ca2f4b9ce8df6b6ea9dcc323909c80 Mon Sep 17 00:00:00 2001 From: Mark Nelson Date: Sun, 11 Aug 2019 20:50:46 -0400 Subject: [PATCH] s3-benchmark: Add maximum object count setting. Signed-off-by: Mark Nelson --- s3-benchmark.go | 55 +++++++++++++++++++++++++++++-------------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/s3-benchmark.go b/s3-benchmark.go index 9a4062c..e1dd05c 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -36,10 +36,10 @@ import ( var access_key, secret_key, url_host, bucket_prefix, region, sizeArg string var buckets []string var duration_secs, threads, loops, bucket_count int -var object_size uint64 var object_data []byte var object_data_md5 string -var running_threads, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int32 +var object_size uint64 +var running_threads, object_count, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int64 var endtime, upload_finish, download_finish, delete_finish time.Time func logit(msg string) { @@ -185,8 +185,15 @@ func runUpload(thread_num int, keys *sync.Map) { bucket_num := thread_num % bucket_count errcnt := 0 svc := s3.New(session.New(), cfg) - for time.Now().Before(endtime) { - objnum := atomic.AddInt32(&upload_count, 1) + for { + if duration_secs > -1 && time.Now().After(endtime) { + break + } + objnum := atomic.AddInt64(&upload_count, 1) + if object_count > -1 && objnum > object_count { + objnum = atomic.AddInt64(&upload_count, -1) + break + } fileobj := bytes.NewReader(object_data) //prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, buckets[bucket_num], objnum) @@ -203,8 +210,8 @@ func runUpload(thread_num int, keys *sync.Map) { err := req.Send() if err != nil { errcnt++ - atomic.AddInt32(&upload_slowdown_count, 1) - atomic.AddInt32(&upload_count, -1) + atomic.AddInt64(&upload_slowdown_count, 1) + atomic.AddInt64(&upload_count, -1) fmt.Println("upload err", err) //break } @@ -217,7 +224,7 @@ func runUpload(thread_num int, keys *sync.Map) { // Remember last done time upload_finish = time.Now() // One less thread - atomic.AddInt32(&running_threads, -1) + atomic.AddInt64(&running_threads, -1) } func runDownload(thread_num int, keys *sync.Map) { @@ -226,7 +233,7 @@ func runDownload(thread_num int, keys *sync.Map) { svc := s3.New(session.New(), cfg) keys.Range(func(k, value interface{}) bool { - if time.Now().After(endtime) { + if duration_secs > -1 && time.Now().After(endtime) { // fmt.Println("time ended for download") return false } @@ -247,8 +254,8 @@ func runDownload(thread_num int, keys *sync.Map) { err := req.Send() if err != nil { errcnt++ - atomic.AddInt32(&download_slowdown_count, 1) - atomic.AddInt32(&download_count, -1) + atomic.AddInt64(&download_slowdown_count, 1) + atomic.AddInt64(&download_count, -1) fmt.Println("download err", err) //break } @@ -258,14 +265,14 @@ func runDownload(thread_num int, keys *sync.Map) { if errcnt > 2 { return false } - atomic.AddInt32(&download_count, 1) + atomic.AddInt64(&download_count, 1) return true }) // Remember last done time download_finish = time.Now() // One less thread - atomic.AddInt32(&running_threads, -1) + atomic.AddInt64(&running_threads, -1) } func runDelete(thread_num int, keys *sync.Map) { @@ -291,8 +298,8 @@ func runDelete(thread_num int, keys *sync.Map) { err := req.Send() if err != nil { errcnt++ - atomic.AddInt32(&delete_slowdown_count, 1) - atomic.AddInt32(&delete_count, -1) + atomic.AddInt64(&delete_slowdown_count, 1) + atomic.AddInt64(&delete_count, -1) fmt.Println("download err", err, "out", out.String()) //break } @@ -306,7 +313,7 @@ func runDelete(thread_num int, keys *sync.Map) { // Remember last done time delete_finish = time.Now() // One less thread - atomic.AddInt32(&running_threads, -1) + atomic.AddInt64(&running_threads, -1) } var cfg *aws.Config @@ -320,7 +327,8 @@ func init() { myflag.StringVar(&bucket_prefix, "p", "hotsauce_benchmark", "Prefix for buckets") myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") - myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds") + myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") + myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") @@ -329,6 +337,9 @@ func init() { } // Check the arguments + if object_count < 0 && duration_secs < 0 { + log.Fatal("The number of objects and duration can not both be unlimited") + } if access_key == "" { log.Fatal("Missing argument -a for access key.") } @@ -391,7 +402,7 @@ func main() { var keys []*sync.Map // Run the upload case - running_threads = int32(threads) + running_threads = int64(threads) starttime := time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 0; n < threads; n++ { @@ -400,7 +411,7 @@ func main() { } // Wait for it to finish - for atomic.LoadInt32(&running_threads) > 0 { + for atomic.LoadInt64(&running_threads) > 0 { time.Sleep(time.Millisecond) } upload_time := upload_finish.Sub(starttime).Seconds() @@ -412,7 +423,7 @@ func main() { uploadspeed = bps / bytefmt.MEGABYTE // Run the download case - running_threads = int32(threads) + running_threads = int64(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 0; n < threads; n++ { @@ -420,7 +431,7 @@ func main() { } // Wait for it to finish - for atomic.LoadInt32(&running_threads) > 0 { + for atomic.LoadInt64(&running_threads) > 0 { time.Sleep(time.Millisecond) } download_time := download_finish.Sub(starttime).Seconds() @@ -432,7 +443,7 @@ func main() { downloadspeed = bps / bytefmt.MEGABYTE // Run the delete case - running_threads = int32(threads) + running_threads = int64(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 0; n < threads; n++ { @@ -440,7 +451,7 @@ func main() { } // Wait for it to finish - for atomic.LoadInt32(&running_threads) > 0 { + for atomic.LoadInt64(&running_threads) > 0 { time.Sleep(time.Millisecond) } delete_time := delete_finish.Sub(starttime).Seconds()