diff --git a/s3-benchmark.go b/s3-benchmark.go index 629a982..9979021 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -36,10 +36,10 @@ import ( // Global variables var access_key, secret_key, url_host, bucket_prefix, object_prefix, region, modes, sizeArg string var buckets []string -var duration_secs, threads, loops, bucket_count int +var duration_secs, threads, loops int var object_data []byte var object_data_md5 string -var running_threads, object_count, object_size, op_count int64 +var running_threads, bucket_count, object_count, object_size, op_counter int64 var endtime, upload_finish, download_finish, delete_finish time.Time var interval float64 @@ -96,7 +96,7 @@ func getS3Client() *s3.S3 { return client } -func createBucket(bucket_num int, ignore_errors bool) { +func createBucket(bucket_num int64, ignore_errors bool) { svc := s3.New(session.New(), cfg) in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])} if _, err := svc.CreateBucket(in); err != nil { @@ -112,7 +112,7 @@ func createBucket(bucket_num int, ignore_errors bool) { } } -func deleteAllObjects(bucket_num int) { +func deleteAllObjects(bucket_num int64) { svc := s3.New(session.New(), cfg) out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) if err != nil { @@ -419,10 +419,10 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { if duration_secs > -1 && time.Now().After(endtime) { break } - objnum := atomic.AddInt64(&op_count, 1) + objnum := atomic.AddInt64(&op_counter, 1) bucket_num := objnum % int64(bucket_count) - if object_count > -1 && objnum > object_count { - objnum = atomic.AddInt64(&op_count, -1) + if object_count > -1 && objnum >= object_count { + objnum = atomic.AddInt64(&op_counter, -1) break } fileobj := bytes.NewReader(object_data) @@ -444,7 +444,7 @@ func runUpload(thread_num int, fendtime time.Time, stats *Stats) { if err != nil { errcnt++ stats.addSlowDown(thread_num); - atomic.AddInt64(&op_count, -1) + atomic.AddInt64(&op_counter, -1) fmt.Println("upload err", err) } else { // Update the stats @@ -470,9 +470,9 @@ func runDownload(thread_num int, fendtime time.Time, stats *Stats) { break } - objnum := atomic.AddInt64(&op_count, 1) - if objnum > object_count { - atomic.AddInt64(&op_count, -1) + objnum := atomic.AddInt64(&op_counter, 1) + if objnum >= object_count { + atomic.AddInt64(&op_counter, -1) break } @@ -519,9 +519,9 @@ func runDelete(thread_num int, stats *Stats) { svc := s3.New(session.New(), cfg) for { - objnum := atomic.AddInt64(&op_count, 1) - if objnum > object_count { - atomic.AddInt64(&op_count, -1) + objnum := atomic.AddInt64(&op_counter, 1) + if objnum >= object_count { + atomic.AddInt64(&op_counter, -1) break } @@ -561,21 +561,27 @@ func runDelete(thread_num int, stats *Stats) { var cfg *aws.Config -func initBuckets(loop int, stats *Stats) { +func initBuckets(thread_num int, stats *Stats) { // Create the buckets and delete all the objects - for i := 0; i < bucket_count; i++ { - start := time.Now().UnixNano() - createBucket(i, true) - deleteAllObjects(i) - end := time.Now().UnixNano() - stats.updateIntervals(0) - stats.addOp(0, 0, end-start) + for { + bucket_num := atomic.AddInt64(&op_counter, 1) + if bucket_num >= bucket_count { + atomic.AddInt64(&op_counter, -1) + break + } + start := time.Now().UnixNano() + createBucket(bucket_num, true) + deleteAllObjects(bucket_num) + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) + stats.addOp(thread_num, 0, end-start) } - stats.finish(0) + atomic.AddInt64(&running_threads, -1) + stats.finish(thread_num) } func runWrapper(loop int, r rune) { - op_count = 0 + op_counter = -1 running_threads = int64(threads) intervalNano := int64(interval*1000000000) endtime = time.Now().Add(time.Second * time.Duration(duration_secs)) @@ -584,9 +590,10 @@ func runWrapper(loop int, r rune) { switch r { case 'i': log.Printf("Running Loop %d Init", loop) - stats = makeStats(loop, "INIT", 1, intervalNano) - initBuckets(loop, &stats); - running_threads = 0; + stats = makeStats(loop, "INIT", threads, intervalNano) + for n := 0; n < threads; n++ { + go initBuckets(n, &stats); + } case 'p': log.Printf("Running Loop %d Put Test", loop) stats = makeStats(loop, "PUT", threads, intervalNano) @@ -624,7 +631,7 @@ func init() { myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info") myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") - myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") + myflag.Int64Var(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") @@ -719,7 +726,7 @@ func main() { initData() // Setup the slice of buckets - for i := 0; i < bucket_count; i++ { + for i := int64(0); i < bucket_count; i++ { buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i)) }