// s3-benchmark.go // Copyright (c) 2017 Wasabi Technology, Inc. package main import ( "bytes" "crypto/hmac" "crypto/md5" "crypto/sha1" "crypto/tls" "encoding/base64" "flag" "fmt" "io" "io/ioutil" "log" "math/rand" "net" "net/http" "os" "sort" "strings" "sync" "sync/atomic" "time" "code.cloudfoundry.org/bytefmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" ) // Global variables var access_key, secret_key, url_host, bucket, region, sizeArg string var duration_secs, threads, loops int var object_size uint64 var object_data []byte var object_data_md5 string var running_threads, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int32 var endtime, upload_finish, download_finish, delete_finish time.Time func logit(msg string) { fmt.Println(msg) logfile, _ := os.OpenFile("benchmark.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) if logfile != nil { logfile.WriteString(time.Now().Format(http.TimeFormat) + ": " + msg + "\n") logfile.Close() } } // Our HTTP transport used for the roundtripper below var HTTPTransport http.RoundTripper = &http.Transport{ Proxy: http.ProxyFromEnvironment, Dial: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, }).Dial, TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 0, // Allow an unlimited number of idle connections MaxIdleConnsPerHost: 4096, MaxIdleConns: 0, // But limit their idle time IdleConnTimeout: time.Minute, // Ignore TLS errors TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, } var httpClient = &http.Client{Transport: HTTPTransport} func getS3Client() *s3.S3 { // Build our config creds := credentials.NewStaticCredentials(access_key, secret_key, "") loglevel := aws.LogOff // Build the rest of the configuration awsConfig := &aws.Config{ Region: aws.String(region), Endpoint: aws.String(url_host), Credentials: creds, LogLevel: &loglevel, S3ForcePathStyle: aws.Bool(true), S3Disable100Continue: aws.Bool(true), // Comment following to use default transport HTTPClient: &http.Client{Transport: HTTPTransport}, } session := session.New(awsConfig) client := s3.New(session) if client == nil { log.Fatalf("FATAL: Unable to create new client.") } // Return success return client } func createBucket(ignore_errors bool) { // Get a client // client := getS3Client() // Create our bucket (may already exist without error) svc := s3.New(session.New(), cfg) in := &s3.CreateBucketInput{Bucket: aws.String(bucket)} if _, err := svc.CreateBucket(in); err != nil && !strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) { if ignore_errors { log.Printf("WARNING: createBucket %s error, ignoring %v", bucket, err) } else { log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err) } } } func deleteAllObjects() { // Get a client // client := getS3Client() svc := s3.New(session.New(), cfg) // in := &s3.DeleteBucketInput{Bucket: aws.String(bucket)} // if _, err := svc.DeleteBucket(in); err != nil { // log.Printf("FATAL: Unable to delete bucket %s : %v", bucket, err) // } out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) if err != nil { log.Fatal("can't list objects") } n := len(out.Contents) if n == 0 { return } fmt.Printf("got existing %v objects, try to delete now...\n", n) for _, v := range out.Contents { svc.DeleteObject(&s3.DeleteObjectInput{ Bucket: &bucket, Key: v.Key, }) } out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) if err != nil { log.Fatal("can't list objects") } fmt.Printf("after delete, got %v objects\n", len(out.Contents)) // // Use multiple routines to do the actual delete // var doneDeletes sync.WaitGroup // // Loop deleting our versions reading as big a list as we can // var keyMarker, versionId *string // var err error // for loop := 1; ; loop++ { // // Delete all the existing objects and versions in the bucket // in := &s3.ListObjectVersionsInput{Bucket: aws.String(bucket), KeyMarker: keyMarker, VersionIdMarker: versionId, MaxKeys: aws.Int64(1000)} // if listVersions, listErr := client.ListObjectVersions(in); listErr == nil { // delete := &s3.Delete{Quiet: aws.Bool(true)} // for _, version := range listVersions.Versions { // delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: version.Key, VersionId: version.VersionId}) // } // for _, marker := range listVersions.DeleteMarkers { // delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: marker.Key, VersionId: marker.VersionId}) // } // if len(delete.Objects) > 0 { // // Start a delete routine // doDelete := func(bucket string, delete *s3.Delete) { // if _, e := client.DeleteObjects(&s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: delete}); e != nil { // err = fmt.Errorf("DeleteObjects unexpected failure: %s", e.Error()) // } // doneDeletes.Done() // } // doneDeletes.Add(1) // go doDelete(bucket, delete) // } // // Advance to next versions // if listVersions.IsTruncated == nil || !*listVersions.IsTruncated { // break // } // keyMarker = listVersions.NextKeyMarker // versionId = listVersions.NextVersionIdMarker // } else { // // The bucket may not exist, just ignore in that case // if strings.HasPrefix(listErr.Error(), "NoSuchBucket") { // return // } // err = fmt.Errorf("ListObjectVersions unexpected failure: %v", listErr) // break // } // } // // Wait for deletes to finish // doneDeletes.Wait() // If error, it is fatal // if err != nil { // log.Fatalf("FATAL: Unable to delete objects from bucket: %v", err) // } } // canonicalAmzHeaders -- return the x-amz headers canonicalized func canonicalAmzHeaders(req *http.Request) string { // Parse out all x-amz headers var headers []string for header := range req.Header { norm := strings.ToLower(strings.TrimSpace(header)) if strings.HasPrefix(norm, "x-amz") { headers = append(headers, norm) } } // Put them in sorted order sort.Strings(headers) // Now add back the values for n, header := range headers { headers[n] = header + ":" + strings.Replace(req.Header.Get(header), "\n", " ", -1) } // Finally, put them back together if len(headers) > 0 { return strings.Join(headers, "\n") + "\n" } else { return "" } } func hmacSHA1(key []byte, content string) []byte { mac := hmac.New(sha1.New, key) mac.Write([]byte(content)) return mac.Sum(nil) } func setSignature(req *http.Request) { // Setup default parameters dateHdr := time.Now().UTC().Format("20060102T150405Z") req.Header.Set("X-Amz-Date", dateHdr) // Get the canonical resource and header canonicalResource := req.URL.EscapedPath() canonicalHeaders := canonicalAmzHeaders(req) stringToSign := req.Method + "\n" + req.Header.Get("Content-MD5") + "\n" + req.Header.Get("Content-Type") + "\n\n" + canonicalHeaders + canonicalResource hash := hmacSHA1([]byte(secret_key), stringToSign) signature := base64.StdEncoding.EncodeToString(hash) req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature)) } func runUpload(thread_num int, keys *sync.Map) { errcnt := 0 svc := s3.New(session.New(), cfg) for time.Now().Before(endtime) { objnum := atomic.AddInt32(&upload_count, 1) fileobj := bytes.NewReader(object_data) //prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) key := fmt.Sprintf("Object-%d", objnum) r := &s3.PutObjectInput{ Bucket: &bucket, Key: &key, Body: fileobj, } req, _ := svc.PutObjectRequest(r) // Disable payload checksum calculation (very expensive) req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD") err := req.Send() if err != nil { errcnt++ atomic.AddInt32(&upload_slowdown_count, 1) atomic.AddInt32(&upload_count, -1) fmt.Println("upload err", err) //break } if errcnt > 2 { break } keys.Store(key, nil) fmt.Printf("upload thread %v, %v\r", thread_num, key) // req, _ := http.NewRequest("PUT", prefix, fileobj) // req.Header.Set("Content-Length", strconv.FormatUint(object_size, 10)) // req.Header.Set("Content-MD5", object_data_md5) // setSignature(req) // if resp, err := httpClient.Do(req); err != nil { // log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err) // } else if resp != nil && resp.StatusCode != http.StatusOK { // if resp.StatusCode == http.StatusServiceUnavailable { // atomic.AddInt32(&upload_slowdown_count, 1) // atomic.AddInt32(&upload_count, -1) // } else { // fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp) // if resp.Body != nil { // body, _ := ioutil.ReadAll(resp.Body) // fmt.Printf("Body: %s\n", string(body)) // } // } // } } // Remember last done time upload_finish = time.Now() // One less thread atomic.AddInt32(&running_threads, -1) } func runDownload(thread_num int, keys *sync.Map) { errcnt := 0 svc := s3.New(session.New(), cfg) keys.Range(func(k, value interface{}) bool { // for { // objnum := atomic.AddInt32(&delete_count, 1) // if objnum > upload_count { // delete_count = 0 // } // key := fmt.Sprintf("Object-%d", objnum) // for key, _ := keys.Range() { if time.Now().After(endtime) { // fmt.Println("time ended for download") return false } var key string var ok bool if key, ok = k.(string); !ok { log.Fatal("convert key back error") } fmt.Printf("download thread %v, %v\r", thread_num, key) // atomic.AddInt32(&download_count, 1) // objnum := rand.Int31n(download_count) + 1 // key := fmt.Sprintf("Object-%d", objnum) r := &s3.GetObjectInput{ Bucket: &bucket, Key: &key, } req, resp := svc.GetObjectRequest(r) err := req.Send() if err != nil { errcnt++ atomic.AddInt32(&download_slowdown_count, 1) atomic.AddInt32(&download_count, -1) fmt.Println("download err", err) //break } if err == nil { _, err = io.Copy(ioutil.Discard, resp.Body) } if errcnt > 2 { return false } atomic.AddInt32(&download_count, 1) // prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) // req, _ := http.NewRequest("GET", prefix, nil) // setSignature(req) // if resp, err := httpClient.Do(req); err != nil { // log.Fatalf("FATAL: Error downloading object %s: %v", prefix, err) // } else if resp != nil && resp.Body != nil { // if resp.StatusCode == http.StatusServiceUnavailable { // atomic.AddInt32(&download_slowdown_count, 1) // atomic.AddInt32(&download_count, -1) // } else { // io.Copy(ioutil.Discard, resp.Body) // } // } return true }) // Remember last done time download_finish = time.Now() // One less thread atomic.AddInt32(&running_threads, -1) } func runDelete(thread_num int) { errcnt := 0 svc := s3.New(session.New(), cfg) for { objnum := atomic.AddInt32(&delete_count, 1) if objnum > upload_count { break } key := fmt.Sprintf("Object-%d", objnum) r := &s3.DeleteObjectInput{ Bucket: &bucket, Key: &key, } req, out := svc.DeleteObjectRequest(r) err := req.Send() if err != nil { errcnt++ atomic.AddInt32(&delete_slowdown_count, 1) atomic.AddInt32(&delete_count, -1) fmt.Println("download err", err, "out", out.String()) //break } if errcnt > 2 { break } fmt.Printf("delete thread %v, %v\r", thread_num, key) // prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) // req, _ := http.NewRequest("DELETE", prefix, nil) // setSignature(req) // if resp, err := httpClient.Do(req); err != nil { // log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err) // } else if resp != nil && resp.StatusCode == http.StatusServiceUnavailable { // atomic.AddInt32(&delete_slowdown_count, 1) // atomic.AddInt32(&delete_count, -1) // } } // Remember last done time delete_finish = time.Now() // One less thread atomic.AddInt32(&running_threads, -1) } var cfg *aws.Config func init() { // Parse command line myflag := flag.NewFlagSet("myflag", flag.ExitOnError) myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key") myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key") myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix") myflag.StringVar(&bucket, "b", "loadgen", "Bucket for testing") myflag.StringVar(®ion, "r", "us-east-1", "Region for testing") myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds") myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") if err := myflag.Parse(os.Args[1:]); err != nil { os.Exit(1) } // Check the arguments if access_key == "" { log.Fatal("Missing argument -a for access key.") } if secret_key == "" { log.Fatal("Missing argument -s for secret key.") } if url_host == "" { log.Fatal("Missing argument -s for host endpoint.") } var err error if object_size, err = bytefmt.ToBytes(sizeArg); err != nil { log.Fatalf("Invalid -z argument for object size: %v", err) } } func main() { // Hello fmt.Println("Wasabi benchmark program v2.0") //fmt.Println("accesskey:", access_key, "secretkey:", secret_key) cfg = &aws.Config{ Endpoint: aws.String(url_host), Credentials: credentials.NewStaticCredentials(access_key, secret_key, ""), Region: aws.String(region), // DisableParamValidation: aws.Bool(true), DisableComputeChecksums: aws.Bool(true), S3ForcePathStyle: aws.Bool(true), } // Echo the parameters logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, region=%s, duration=%d, threads=%d, loops=%d, size=%s", url_host, bucket, region, duration_secs, threads, loops, sizeArg)) // Initialize data for the bucket object_data = make([]byte, object_size) rand.Read(object_data) hasher := md5.New() hasher.Write(object_data) object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) // Create the bucket and delete all the objects createBucket(true) deleteAllObjects() // Loop running the tests for loop := 1; loop <= loops; loop++ { // reset counters upload_count = 0 upload_slowdown_count = 0 download_count = 0 download_slowdown_count = 0 delete_count = 0 delete_slowdown_count = 0 keys := &sync.Map{} // Run the upload case running_threads = int32(threads) starttime := time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 1; n <= threads; n++ { go runUpload(n, keys) } // Wait for it to finish for atomic.LoadInt32(&running_threads) > 0 { time.Sleep(time.Millisecond) } upload_time := upload_finish.Sub(starttime).Seconds() bps := float64(uint64(upload_count)*object_size) / upload_time logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count)) // count := 0 // keys.Range(func(k, value interface{}) bool { // count++ // return true // }) // fmt.Println("map got ", count) // Run the download case running_threads = int32(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 1; n <= threads; n++ { go runDownload(n, keys) } // Wait for it to finish for atomic.LoadInt32(&running_threads) > 0 { time.Sleep(time.Millisecond) } download_time := download_finish.Sub(starttime).Seconds() bps = float64(uint64(download_count)*object_size) / download_time logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d", loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count)) // Run the delete case running_threads = int32(threads) starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 1; n <= threads; n++ { go runDelete(n) } // Wait for it to finish for atomic.LoadInt32(&running_threads) > 0 { time.Sleep(time.Millisecond) } delete_time := delete_finish.Sub(starttime).Seconds() logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d", loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count)) } // All done }