From 389e714111384b95745d970c1988df2c3333bd57 Mon Sep 17 00:00:00 2001 From: Mark Nelson Date: Sun, 11 Aug 2019 19:26:21 -0400 Subject: [PATCH] s3-benchmark: Implement multiple buckets. Signed-off-by: Mark Nelson --- s3-benchmark.go | 101 +++++++++++++++++++++++++++++------------------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/s3-benchmark.go b/s3-benchmark.go index fd382ac..855986b 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -33,8 +33,9 @@ import ( ) // Global variables -var access_key, secret_key, url_host, bucket, region, sizeArg string -var duration_secs, threads, loops int +var access_key, secret_key, url_host, bucket_prefix, region, sizeArg string +var buckets []string +var duration_secs, threads, loops, bucket_count int var object_size uint64 var object_data []byte var object_data_md5 string @@ -94,34 +95,35 @@ func getS3Client() *s3.S3 { return client } -func createBucket(ignore_errors bool) { +func createBucket(bucket_num int, ignore_errors bool) { // Get a client // client := getS3Client() // Create our bucket (may already exist without error) svc := s3.New(session.New(), cfg) - in := &s3.CreateBucketInput{Bucket: aws.String(bucket)} + log.Printf(buckets[bucket_num]) + in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} if _, err := svc.CreateBucket(in); err != nil { if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) || strings.Contains(err.Error(), "BucketAlreadyExists") { return } if ignore_errors { - log.Printf("WARNING: createBucket %s error, ignoring %v", bucket, err) + log.Printf("WARNING: createBucket %s error, ignoring %v", buckets[bucket_num], err) } else { - log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err) + log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err) } } } -func deleteAllObjects() { +func deleteAllObjects(bucket_num int) { // Get a client // client := getS3Client() svc := s3.New(session.New(), cfg) - // in := &s3.DeleteBucketInput{Bucket: aws.String(bucket)} + // in := &s3.DeleteBucketInput{Bucket: aws.String(buckets[bucket_num])} // if _, err := svc.DeleteBucket(in); err != nil { - // log.Printf("FATAL: Unable to delete bucket %s : %v", bucket, err) + // log.Printf("FATAL: Unable to delete bucket %s : %v", buckets[bucket_num], err) // } - out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) + out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) if err != nil { log.Fatal("can't list objects") } @@ -133,11 +135,11 @@ func deleteAllObjects() { for _, v := range out.Contents { svc.DeleteObject(&s3.DeleteObjectInput{ - Bucket: &bucket, + Bucket: &buckets[bucket_num], Key: v.Key, }) } - out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) + out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) if err != nil { log.Fatal("can't list objects") } @@ -239,16 +241,17 @@ func setSignature(req *http.Request) { } func runUpload(thread_num int, keys *sync.Map) { + bucket_num := thread_num % bucket_count errcnt := 0 svc := s3.New(session.New(), cfg) for time.Now().Before(endtime) { objnum := atomic.AddInt32(&upload_count, 1) fileobj := bytes.NewReader(object_data) - //prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + //prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, buckets[bucket_num], objnum) key := fmt.Sprintf("Object-%d", objnum) r := &s3.PutObjectInput{ - Bucket: &bucket, + Bucket: &buckets[bucket_num], Key: &key, Body: fileobj, } @@ -297,6 +300,7 @@ func runUpload(thread_num int, keys *sync.Map) { } func runDownload(thread_num int, keys *sync.Map) { + bucket_num := thread_num % bucket_count errcnt := 0 svc := s3.New(session.New(), cfg) @@ -321,7 +325,7 @@ func runDownload(thread_num int, keys *sync.Map) { fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key) r := &s3.GetObjectInput{ - Bucket: &bucket, + Bucket: &buckets[bucket_num], Key: &key, } @@ -365,17 +369,29 @@ func runDownload(thread_num int, keys *sync.Map) { atomic.AddInt32(&running_threads, -1) } -func runDelete(thread_num int) { +func runDelete(thread_num int, keys *sync.Map) { + bucket_num := thread_num % bucket_count errcnt := 0 svc := s3.New(session.New(), cfg) - for { - objnum := atomic.AddInt32(&delete_count, 1) - if objnum > upload_count { - break - } - key := fmt.Sprintf("Object-%d", objnum) + + keys.Range(func(k, value interface{}) bool { + + // objnum := atomic.AddInt32(&delete_count, 1) + // if objnum > upload_count { + // break + // } + // key := fmt.Sprintf("Object-%d", objnum) + + var key string + var ok bool + if key, ok = k.(string); !ok { + log.Fatal("convert key back error") + } + + fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key) + r := &s3.DeleteObjectInput{ - Bucket: &bucket, + Bucket: &buckets[bucket_num], Key: &key, } @@ -389,7 +405,7 @@ func runDelete(thread_num int) { //break } if errcnt > 2 { - break + return false } fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key) @@ -402,7 +418,10 @@ func runDelete(thread_num int) { // atomic.AddInt32(&delete_slowdown_count, 1) // atomic.AddInt32(&delete_count, -1) // } - } + + return true + }) + // Remember last done time delete_finish = time.Now() // One less thread @@ -417,7 +436,8 @@ func init() { myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key") myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key") myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix") - myflag.StringVar(&bucket, "b", "loadgen", "Bucket for testing") + myflag.StringVar(&bucket_prefix, "p", "hotsauce_benchmark", "Prefix for buckets") + myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds") myflag.IntVar(&threads, "t", 1, "Number of threads to run") @@ -458,8 +478,8 @@ func main() { } // Echo the parameters - logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, region=%s, duration=%d, threads=%d, loops=%d, size=%s", - url_host, bucket, region, duration_secs, threads, loops, sizeArg)) + logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s", + url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg)) // Initialize data for the bucket object_data = make([]byte, object_size) @@ -468,9 +488,12 @@ func main() { hasher.Write(object_data) object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) - // Create the bucket and delete all the objects - createBucket(true) - deleteAllObjects() + // Create the buckets and delete all the objects + for i := 0; i < bucket_count; i++ { + buckets = append(buckets, fmt.Sprintf("%s-%d", bucket_prefix, i)) + createBucket(i, true) + deleteAllObjects(i) + } var uploadspeed, downloadspeed float64 @@ -485,14 +508,14 @@ func main() { delete_count = 0 delete_slowdown_count = 0 - keys := &sync.Map{} - + var keys []*sync.Map // Run the upload case running_threads = int32(threads) starttime := time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 1; n <= threads; n++ { - go runUpload(n, keys) + for n := 0; n < threads; n++ { + keys = append(keys, &sync.Map{}) + go runUpload(n, keys[n]) } // Wait for it to finish @@ -517,8 +540,8 @@ func main() { running_threads = int32(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 1; n <= threads; n++ { - go runDownload(n, keys) + for n := 0; n < threads; n++ { + go runDownload(n, keys[n]) } // Wait for it to finish @@ -537,8 +560,8 @@ func main() { running_threads = int32(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 1; n <= threads; n++ { - go runDelete(n) + for n := 0; n < threads; n++ { + go runDelete(n, keys[n]) } // Wait for it to finish