diff --git a/s3-benchmark.go b/s3-benchmark.go index 2a46cf1..fe9ccfa 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -32,7 +32,7 @@ import ( ) // Global variables -var access_key, secret_key, url_host, bucket_prefix, object_prefix, region, sizeArg string +var access_key, secret_key, url_host, bucket_prefix, object_prefix, region, modes, sizeArg string var buckets []string var duration_secs, threads, loops, bucket_count int var object_data []byte @@ -309,6 +309,100 @@ func runDelete(thread_num int) { var cfg *aws.Config +func init_buckets(loop int) { + // Initialize data for the bucket + object_data = make([]byte, object_size) + rand.Read(object_data) + hasher := md5.New() + hasher.Write(object_data) + object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) + + // Create the buckets and delete all the objects + starttime := time.Now() + for i := 0; i < bucket_count; i++ { + createBucket(i, true) + deleteAllObjects(i) + } + init_time := time.Now().Sub(starttime).Seconds() + + logit(fmt.Sprintf("Loop %d: INIT time %.1f secs, buckets = %d, speed = %.1f buckets/sec.", + loop, init_time, bucket_count, float64(bucket_count)/init_time)) +} + +func run_put(loop int) float64 { + // reset counters + upload_count = 0 + upload_slowdown_count = 0 + + running_threads = int64(threads) + starttime := time.Now() + endtime = starttime.Add(time.Second * time.Duration(duration_secs)) + for n := 0; n < threads; n++ { + go runUpload(n) + } + + // Wait for it to finish + for atomic.LoadInt64(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + upload_time := upload_finish.Sub(starttime).Seconds() + + bps := float64(uint64(upload_count)*object_size) / upload_time + logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", + loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count)) + + return bps / bytefmt.MEGABYTE +} + +func run_get(loop int) float64 { + // reset counters + download_count = 0 + download_slowdown_count = 0 + + running_threads = int64(threads) + starttime := time.Now() + endtime = starttime.Add(time.Second * time.Duration(duration_secs)) + for n := 0; n < threads; n++ { + go runDownload(n) + } + + // Wait for it to finish + for atomic.LoadInt64(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + download_time := download_finish.Sub(starttime).Seconds() + + bps := float64(uint64(download_count)*object_size) / download_time + logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", + loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count)) + + return bps / bytefmt.MEGABYTE +} + +func run_delete(loop int) float64 { + // reset counters + delete_count = 0 + delete_slowdown_count = 0 + + running_threads = int64(threads) + starttime := time.Now() + for n := 0; n < threads; n++ { + go runDelete(n) + } + + // Wait for it to finish + for atomic.LoadInt64(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + delete_time := delete_finish.Sub(starttime).Seconds() + + bps := float64(uint64(delete_count)*object_size) / delete_time + logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d", + loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count)) + + return bps / bytefmt.MEGABYTE +} + func init() { // Parse command line myflag := flag.NewFlagSet("myflag", flag.ExitOnError) @@ -318,12 +412,36 @@ func init() { myflag.StringVar(&object_prefix, "op", "", "Prefix for objects") myflag.StringVar(&bucket_prefix, "bp", "hotsauce_bench", "Prefix for buckets") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") + myflag.StringVar(&modes, "m", "ipgd", "Run modes in order. See NOTES for more info") myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>") myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across") myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") + + // define custom usage output with notes + notes := +` +NOTES: + - Valid mode types for the -m mode string are: + i: initialize buckets and clear any existing objects + p: put objects in buckets + g: get objects from buckets + d: delete objects from buckets + + These modes are processed in-order and can be repeated, ie "ippgd" will + initialize the buckets, put the objects, reput the objects, get the + objects, and then delete the objects. The repeat flag will repeat this + whole process the specified number of times. +` + myflag.Usage = func() { + fmt.Fprintf(flag.CommandLine.Output(), "\nUSAGE: %s [OPTIONS]\n\n", os.Args[0]) + fmt.Fprintf(flag.CommandLine.Output(), "OPTIONS:\n") + myflag.PrintDefaults() + fmt.Fprintf(flag.CommandLine.Output(), notes); + } + if err := myflag.Parse(os.Args[1:]); err != nil { os.Exit(1) } @@ -341,6 +459,17 @@ func init() { if url_host == "" { log.Fatal("Missing argument -s for host endpoint.") } + invalid_mode := false + for _, r := range modes { + if (r != 'i' && r != 'p' && r != 'g' && r != 'd') { + s := fmt.Sprintf("Invalid mode '%s' passed to -m", string(r)) + log.Printf(s) + invalid_mode = true + } + } + if invalid_mode { + log.Fatal("Invalid modes passed to -m, see help for details.") + } var err error if object_size, err = bytefmt.ToBytes(sizeArg); err != nil { log.Fatalf("Invalid -z argument for object size: %v", err) @@ -365,89 +494,26 @@ func main() { logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s", url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg)) - // Initialize data for the bucket - object_data = make([]byte, object_size) - rand.Read(object_data) - hasher := md5.New() - hasher.Write(object_data) - object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) - - // Create the buckets and delete all the objects + // Setup the slice of buckets for i := 0; i < bucket_count; i++ { - buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i)) - createBucket(i, true) - deleteAllObjects(i) - } + buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i)) + } var uploadspeed, downloadspeed float64 - // Loop running the tests for loop := 1; loop <= loops; loop++ { - - // reset counters - upload_count = 0 - upload_slowdown_count = 0 - download_count = 0 - download_slowdown_count = 0 - delete_count = 0 - delete_slowdown_count = 0 - - // Run the upload case - running_threads = int64(threads) - starttime := time.Now() - endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 0; n < threads; n++ { - go runUpload(n) + for _, r := range modes { + switch r { + case 'i': + init_buckets(loop); + case 'p': + uploadspeed = run_put(loop); + case 'g': + downloadspeed = run_get(loop); + case 'd': + run_delete(loop); + } } - - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } - upload_time := upload_finish.Sub(starttime).Seconds() - - bps := float64(uint64(upload_count)*object_size) / upload_time - logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", - loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count)) - - uploadspeed = bps / bytefmt.MEGABYTE - - // Run the download case - running_threads = int64(threads) - starttime = time.Now() - endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 0; n < threads; n++ { - go runDownload(n) - } - - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } - download_time := download_finish.Sub(starttime).Seconds() - - bps = float64(uint64(download_count)*object_size) / download_time - logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", - loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count)) - - downloadspeed = bps / bytefmt.MEGABYTE - - // Run the delete case - running_threads = int64(threads) - starttime = time.Now() - endtime = starttime.Add(time.Second * time.Duration(duration_secs)) - for n := 0; n < threads; n++ { - go runDelete(n) - } - - // Wait for it to finish - for atomic.LoadInt64(&running_threads) > 0 { - time.Sleep(time.Millisecond) - } - delete_time := delete_finish.Sub(starttime).Seconds() - - logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d", - loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count)) } // All done