From 4652f6d43e5f9d4f0a7bc474fb6c043bd83cc5d6 Mon Sep 17 00:00:00 2001 From: Mark Nelson Date: Thu, 15 Aug 2019 16:06:28 -0400 Subject: [PATCH] s3-benchmark: refactor and separate out bucket tests Signed-off-by: Mark Nelson --- s3-benchmark.go | 187 +++++++++++++++++++++++++++++------------------- 1 file changed, 114 insertions(+), 73 deletions(-) diff --git a/s3-benchmark.go b/s3-benchmark.go index 9979021..8c74e07 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -40,7 +40,7 @@ var duration_secs, threads, loops int var object_data []byte var object_data_md5 string var running_threads, bucket_count, object_count, object_size, op_counter int64 -var endtime, upload_finish, download_finish, delete_finish time.Time +var endtime time.Time var interval float64 func logit(msg string) { @@ -96,47 +96,6 @@ func getS3Client() *s3.S3 { return client } -func createBucket(bucket_num int64, ignore_errors bool) { - svc := s3.New(session.New(), cfg) - in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} - if _, err := svc.CreateBucket(in); err != nil { - if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) || - strings.Contains(err.Error(), "BucketAlreadyExists") { - return - } - if ignore_errors { - log.Printf("WARNING: createBucket %s error, ignoring %v", buckets[bucket_num], err) - } else { - log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err) - } - } -} - -func deleteAllObjects(bucket_num int64) { - svc := s3.New(session.New(), cfg) - out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) - if err != nil { - log.Fatal("can't list objects") - } - n := len(out.Contents) - for n > 0 { - fmt.Printf("got existing %v objects, try to delete now...\n", n) - - for _, v := range out.Contents { - svc.DeleteObject(&s3.DeleteObjectInput{ - Bucket: &buckets[bucket_num], - Key: v.Key, - }) - } - out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) - if err != nil { - log.Fatal("can't list objects") - } - n = len(out.Contents) - fmt.Printf("after delete, got %v objects\n", n) - } -} - // canonicalAmzHeaders -- return the x-amz headers canonicalized func canonicalAmzHeaders(req *http.Request) string { // Parse out all x-amz headers @@ -445,7 +404,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { errcnt++ stats.addSlowDown(thread_num); atomic.AddInt64(&op_counter, -1) - fmt.Println("upload err", err) + log.Printf("upload err", err) } else { // Update the stats stats.addOp(thread_num, object_size, end-start) @@ -454,11 +413,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { break } } - // Remember last done time - upload_finish = time.Now() - // One less thread atomic.AddInt64(&running_threads, -1) - // stats are done stats.finish(thread_num) } @@ -492,7 +447,7 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) { if err != nil { errcnt++ stats.addSlowDown(thread_num); - fmt.Println("download err", err) + log.Printf("download err", err) } else { // Update the stats stats.addOp(thread_num, object_size, end-start) @@ -506,11 +461,7 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) { } } - // Remember last done time - download_finish = time.Now() - // One less thread atomic.AddInt64(&running_threads, -1) - // stats are done stats.finish(thread_num) } @@ -542,7 +493,7 @@ func runDelete(thread_num int, stats *Stats) { if err != nil { errcnt++ stats.addSlowDown(thread_num); - fmt.Println("delete err", err, "out", out.String()) + log.Printf("delete err", err, "out", out.String()) } else { // Update the stats stats.addOp(thread_num, object_size, end-start) @@ -551,35 +502,104 @@ func runDelete(thread_num int, stats *Stats) { break } } - // Remember last done time - delete_finish = time.Now() - // One less thread atomic.AddInt64(&running_threads, -1) - // stats are done + stats.finish(thread_num) +} + +func runBucketDelete(thread_num int, stats *Stats) { + svc := s3.New(session.New(), cfg) + + for { + bucket_num := atomic.AddInt64(&op_counter, 1) + if bucket_num >= bucket_count { + atomic.AddInt64(&op_counter, -1) + break + } + r := &s3.DeleteBucketInput{ + Bucket: &buckets[bucket_num], + } + + start := time.Now().UnixNano() + _, err := svc.DeleteBucket(r) + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) + + if err != nil { + break + } + stats.addOp(thread_num, 0, end-start) + } + atomic.AddInt64(&running_threads, -1) stats.finish(thread_num) } var cfg *aws.Config -func initBuckets(thread_num int, stats *Stats) { - // Create the buckets and delete all the objects +func runBucketsInit(thread_num int, stats *Stats) { + svc := s3.New(session.New(), cfg) + for { - bucket_num := atomic.AddInt64(&op_counter, 1) + bucket_num := atomic.AddInt64(&op_counter, 1) if bucket_num >= bucket_count { atomic.AddInt64(&op_counter, -1) break } start := time.Now().UnixNano() - createBucket(bucket_num, true) - deleteAllObjects(bucket_num) + in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} + _, err := svc.CreateBucket(in) end := time.Now().UnixNano() stats.updateIntervals(thread_num) + + if err != nil { + if !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) && + !strings.Contains(err.Error(), "BucketAlreadyExists") { + log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err) + } + } stats.addOp(thread_num, 0, end-start) } atomic.AddInt64(&running_threads, -1) stats.finish(thread_num) } +func runBucketsClear(thread_num int, stats *Stats) { + svc := s3.New(session.New(), cfg) + + for { + bucket_num := atomic.AddInt64(&op_counter, 1) + if bucket_num >= bucket_count { + atomic.AddInt64(&op_counter, -1) + break + } + out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) + if err != nil { + break + } + n := len(out.Contents) + for n > 0 { + for _, v := range out.Contents { + start := time.Now().UnixNano() + svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: &buckets[bucket_num], + Key: v.Key, + }) + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) + stats.addOp(thread_num, *v.Size, end-start) + + } + out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) + if err != nil { + break + } + n = len(out.Contents) + } + } + atomic.AddInt64(&running_threads, -1) + stats.finish(thread_num) +} + + func runWrapper(loop int, r rune) { op_counter = -1 running_threads = int64(threads) @@ -588,31 +608,44 @@ func runWrapper(loop int, r rune) { var stats Stats switch r { + case 'c': + log.Printf("Running Loop %d BUCKET CLEAR TEST", loop) + stats = makeStats(loop, "BCLR", threads, intervalNano) + for n := 0; n < threads; n++ { + go runBucketsClear(n, &stats); + } + case 'x': + log.Printf("Running Loop %d BUCKET DELETE TEST", loop) + stats = makeStats(loop, "BDEL", threads, intervalNano) + for n := 0; n < threads; n++ { + go runBucketDelete(n, &stats); + } case 'i': - log.Printf("Running Loop %d Init", loop) - stats = makeStats(loop, "INIT", threads, intervalNano) + log.Printf("Running Loop %d BUCKET INIT TEST", loop) + stats = makeStats(loop, "BINIT", threads, intervalNano) for n := 0; n < threads; n++ { - go initBuckets(n, &stats); + go runBucketsInit(n, &stats); } case 'p': - log.Printf("Running Loop %d Put Test", loop) + log.Printf("Running Loop %d OBJECT PUT TEST", loop) stats = makeStats(loop, "PUT", threads, intervalNano) for n := 0; n < threads; n++ { go runUpload(n, endtime, &stats); } case 'g': - log.Printf("Running Loop %d Get Test", loop) + log.Printf("Running Loop %d OBJECT GET TEST", loop) stats = makeStats(loop, "GET", threads, intervalNano) for n := 0; n < threads; n++ { go runDownload(n, endtime, &stats); } case 'd': - log.Printf("Running Loop %d Del Test", loop) + log.Printf("Running Loop %d OBJECT DELETE TEST", loop) stats = makeStats(loop, "DEL", threads, intervalNano) for n := 0; n < threads; n++ { go runDelete(n, &stats); } - } + } + // Wait for it to finish for atomic.LoadInt64(&running_threads) > 0 { time.Sleep(time.Millisecond) @@ -629,7 +662,7 @@ func init() { myflag.StringVar(&object_prefix, "op", "", "Prefix for objects") myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") - myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info") + myflag.StringVar(&modes, "m", "cxipgdx", "Run modes in order. See NOTES for more info") myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>") @@ -642,10 +675,12 @@ func init() { ` NOTES: - Valid mode types for the -m mode string are: - i: initialize buckets and clear any existing objects + c: clear all existing objects from buckets (requires lookups) + x: delete buckets + i: initialize buckets p: put objects in buckets g: get objects from buckets - d: delete objects from buckets + d: delete objects from buckets These modes are processed in-order and can be repeated, ie "ippgd" will initialize the buckets, put the objects, reput the objects, get the @@ -678,7 +713,13 @@ NOTES: } invalid_mode := false for _, r := range modes { - if (r != 'i' && r != 'p' && r != 'g' && r != 'd') { + if ( + r != 'i' && + r != 'c' && + r != 'p' && + r != 'g' && + r != 'd' && + r != 'x') { s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r)) log.Printf(s) invalid_mode = true