diff --git a/README.md b/README.md index 7961f93..31de8c4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,35 @@ # s3-benchmark -Performance test for comparison of AWS versus Wasabi S3 systems. +s3-benchmark is a program for performing S3 operations PUT, GET, and DELETE for objects. Besides the +bucket configuration, the object size and number of threads can be given for different tests. + +The test is loosely based on the Nasuni benchmark used to test the performance of different cloud +storage providers. + +# Building the Program +If the test is being run on the Ubuntu version 16.04 LTS (the current long term release), the binary +executable s3-benchmark.ubuntu will run the benchmark without building. + +Otherwise, to build the test, you must install the Go 1.7 system along with the supporting libraries. + +# Command Line Arguments +Below are the command line arguments to the program: + + -a string + Access key + -b string + Bucket for testing (default "wasabi-benchmark-bucket") + -d int + Duration of each test in seconds (default 60) + -l int + Number of times to repeat test (default 1) + -s string + Secret key + -t int + Number of threads to run (default 1) + -u string + URL for host with method prefix (default "http://s3.wasabisys.com") + -z string + Size of objects in bytes with postfix K, M, and G (default "1M") + +# Example Benchmark + diff --git a/s3-benchmark.go b/s3-benchmark.go new file mode 100644 index 0000000..126b921 --- /dev/null +++ b/s3-benchmark.go @@ -0,0 +1,372 @@ +// s3-benchmark.go +// Copyright (c) 2017 Wasabi Technology, Inc. + +package main + +import ( + "bytes" + "crypto/hmac" + "crypto/md5" + "crypto/sha1" + "crypto/tls" + "encoding/base64" + "flag" + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/pivotal-golang/bytefmt" + "io" + "io/ioutil" + "log" + "math/rand" + "net" + "net/http" + "os" + "sort" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +// Global variables +var access_key, secret_key, url_host, bucket string +var duration_secs, threads, loops int +var object_size uint64 +var object_data []byte +var object_data_md5 string +var running_threads, upload_count, download_count, delete_count int32 +var endtime, upload_finish, download_finish, delete_finish time.Time + +func logit(msg string) { + fmt.Println(msg) + logfile, _ := os.OpenFile("benchmark.log", os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666) + if logfile != nil { + logfile.WriteString(time.Now().Format(http.TimeFormat) + ": " + msg + "\n") + logfile.Close() + } +} + +// Our HTTP transport used for the roundtripper below +var HTTPTransport http.RoundTripper = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + Dial: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).Dial, + TLSHandshakeTimeout: 10 * time.Second, + ExpectContinueTimeout: 0, + // Allow an unlimited number of idle connections + MaxIdleConnsPerHost: 4096, + MaxIdleConns: 0, + // But limit their idle time + IdleConnTimeout: time.Minute, + // Ignore TLS errors + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, +} + +var httpClient = &http.Client{Transport: HTTPTransport} + +func getS3Client() *s3.S3 { + // Build our config + creds := credentials.NewStaticCredentials(access_key, secret_key, "") + loglevel := aws.LogOff + // Build the rest of the configuration + awsConfig := &aws.Config{ + Region: aws.String("us-east-1"), + Endpoint: aws.String(url_host), + Credentials: creds, + LogLevel: &loglevel, + S3ForcePathStyle: aws.Bool(true), + S3Disable100Continue: aws.Bool(true), + // Comment following to use default transport + HTTPClient: &http.Client{Transport: HTTPTransport}, + } + session := session.New(awsConfig) + client := s3.New(session) + if client == nil { + log.Fatalf("FATAL: Unable to create new client.") + } + // Return success + return client +} + +func createBucket() { + // Get a client + client := getS3Client() + // Create our bucket (may already exist without error) + in := &s3.CreateBucketInput{Bucket: aws.String(bucket)} + if _, err := client.CreateBucket(in); err != nil { + log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err) + } +} + +func deleteAllObjects() { + // Get a client + client := getS3Client() + // Use multiple routines to do the actual delete + var doneDeletes sync.WaitGroup + // Loop deleting our versions reading as big a list as we can + var keyMarker, versionId *string + var err error + for loop := 1; ; loop++ { + // Delete all the existing objects and versions in the bucket + in := &s3.ListObjectVersionsInput{Bucket: aws.String(bucket), KeyMarker: keyMarker, VersionIdMarker: versionId, MaxKeys: aws.Int64(1000)} + if listVersions, listErr := client.ListObjectVersions(in); listErr == nil { + delete := &s3.Delete{Quiet: aws.Bool(true)} + for _, version := range listVersions.Versions { + delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: version.Key, VersionId: version.VersionId}) + } + for _, marker := range listVersions.DeleteMarkers { + delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: marker.Key, VersionId: marker.VersionId}) + } + if len(delete.Objects) > 0 { + // Start a delete routine + doDelete := func(bucket string, delete *s3.Delete) { + if _, e := client.DeleteObjects(&s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: delete}); e != nil { + err = fmt.Errorf("DeleteObjects unexpected failure: %s", e.Error()) + } + doneDeletes.Done() + } + doneDeletes.Add(1) + go doDelete(bucket, delete) + } + // Advance to next versions + if listVersions.IsTruncated == nil || !*listVersions.IsTruncated { + break + } + keyMarker = listVersions.NextKeyMarker + versionId = listVersions.NextVersionIdMarker + } else { + // The bucket may not exist, just ignore in that case + if strings.HasPrefix(listErr.Error(), "NoSuchBucket") { + return + } + err = fmt.Errorf("ListObjectVersions unexpected failure: %v", listErr) + break + } + } + // Wait for deletes to finish + doneDeletes.Wait() + // If error, it is fatal + if err != nil { + log.Fatalf("FATAL: Unable to delete objects from bucket: %v", err) + } +} + +// canonicalAmzHeaders -- return the x-amz headers canonicalized +func canonicalAmzHeaders(req *http.Request) string { + // Parse out all x-amz headers + var headers []string + for header := range req.Header { + norm := strings.ToLower(strings.TrimSpace(header)) + if strings.HasPrefix(norm, "x-amz") { + headers = append(headers, norm) + } + } + // Put them in sorted order + sort.Strings(headers) + // Now add back the values + for n, header := range headers { + headers[n] = header + ":" + strings.Replace(req.Header.Get(header), "\n", " ", -1) + } + // Finally, put them back together + if len(headers) > 0 { + return strings.Join(headers, "\n") + "\n" + } else { + return "" + } +} + +func hmacSHA1(key []byte, content string) []byte { + mac := hmac.New(sha1.New, key) + mac.Write([]byte(content)) + return mac.Sum(nil) +} + +func setSignature(req *http.Request) { + // Setup default parameters + dateHdr := time.Now().UTC().Format("20060102T150405Z") + req.Header.Set("X-Amz-Date", dateHdr) + // Get the canonical resource and header + canonicalResource := req.URL.EscapedPath() + canonicalHeaders := canonicalAmzHeaders(req) + stringToSign := req.Method + "\n" + req.Header.Get("Content-MD5") + "\n" + req.Header.Get("Content-Type") + "\n\n" + + canonicalHeaders + canonicalResource + hash := hmacSHA1([]byte(secret_key), stringToSign) + signature := base64.StdEncoding.EncodeToString(hash) + req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature)) +} + +func runUpload(thread_num int) { + for time.Now().Before(endtime) { + objnum := atomic.AddInt32(&upload_count, 1) + fileobj := bytes.NewReader(object_data) + prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + req, _ := http.NewRequest("PUT", prefix, fileobj) + req.Header.Set("Content-Length", strconv.FormatUint(object_size, 10)) + req.Header.Set("Content-MD5", object_data_md5) + setSignature(req) + if resp, err := httpClient.Do(req); err != nil { + log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err) + } else if resp.StatusCode != http.StatusOK { + fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp) + if resp.Body != nil { + body, _ := ioutil.ReadAll(resp.Body) + fmt.Printf("Body: %s\n", string(body)) + } + } + } + // Remember last done time + upload_finish = time.Now() + // One less thread + atomic.AddInt32(&running_threads, -1) +} + +func runDownload(thread_num int) { + for time.Now().Before(endtime) { + atomic.AddInt32(&download_count, 1) + objnum := rand.Int31n(upload_count) + 1 + prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + req, _ := http.NewRequest("GET", prefix, nil) + setSignature(req) + if resp, err := httpClient.Do(req); err != nil { + log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err) + } else if resp != nil && resp.Body != nil { + io.Copy(ioutil.Discard, resp.Body) + } + } + // Remember last done time + download_finish = time.Now() + // One less thread + atomic.AddInt32(&running_threads, -1) +} + +func runDelete(thread_num int) { + for { + objnum := atomic.AddInt32(&delete_count, 1) + if objnum > upload_count { + break + } + prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) + req, _ := http.NewRequest("DELETE", prefix, nil) + setSignature(req) + if _, err := httpClient.Do(req); err != nil { + log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err) + } + } + // Remember last done time + delete_finish = time.Now() + // One less thread + atomic.AddInt32(&running_threads, -1) +} + +func main() { + // Hello + fmt.Println("Wasabi benchmark program v2.0") + + // Parse command line + myflag := flag.NewFlagSet("myflag", flag.ExitOnError) + myflag.StringVar(&access_key, "a", "", "Access key") + myflag.StringVar(&secret_key, "s", "", "Secret key") + myflag.StringVar(&url_host, "u", "http://s3.wasabisys.com", "URL for host with method prefix") + myflag.StringVar(&bucket, "b", "wasabi-benchmark-bucket", "Bucket for testing") + myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds") + myflag.IntVar(&threads, "t", 1, "Number of threads to run") + myflag.IntVar(&loops, "l", 1, "Number of times to repeat test") + var sizeArg string + myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G") + if err := myflag.Parse(os.Args[1:]); err != nil { + os.Exit(1) + } + + // Check the arguments + if access_key == "" { + log.Fatal("Missing argument -a for access key.") + } + if secret_key == "" { + log.Fatal("Missing argument -s for secret key.") + } + var err error + if object_size, err = bytefmt.ToBytes(sizeArg); err != nil { + log.Fatalf("Invalid -z argument for object size: %v", err) + } + + // Echo the parameters + logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, duration=%d, threads=%d, loops=%d, size=%s", + url_host, bucket, duration_secs, threads, loops, sizeArg)) + + // Initialize data for the bucket + object_data = make([]byte, object_size) + rand.Read(object_data) + hasher := md5.New() + hasher.Write(object_data) + object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) + + // Create the bucket and delete all the objects + createBucket() + deleteAllObjects() + + // Loop running the tests + for loop := 1; loop <= loops; loop++ { + + // Run the upload case + running_threads = int32(threads) + starttime := time.Now() + endtime = starttime.Add(time.Second * time.Duration(duration_secs)) + for n := 1; n <= threads; n++ { + go runUpload(n) + } + + // Wait for it to finish + for atomic.LoadInt32(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + upload_time := upload_finish.Sub(starttime).Seconds() + + bps := float64(uint64(upload_count)*object_size) / upload_time + logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec.", + loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time)) + + // Run the download case + running_threads = int32(threads) + starttime = time.Now() + endtime = starttime.Add(time.Second * time.Duration(duration_secs)) + for n := 1; n <= threads; n++ { + go runDownload(n) + } + + // Wait for it to finish + for atomic.LoadInt32(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + download_time := download_finish.Sub(starttime).Seconds() + + bps = float64(uint64(download_count)*object_size) / download_time + logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec.", + loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time)) + + // Run the delete case + running_threads = int32(threads) + starttime = time.Now() + endtime = starttime.Add(time.Second * time.Duration(duration_secs)) + for n := 1; n <= threads; n++ { + go runDelete(n) + } + + // Wait for it to finish + for atomic.LoadInt32(&running_threads) > 0 { + time.Sleep(time.Millisecond) + } + delete_time := delete_finish.Sub(starttime).Seconds() + + logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec.", + loop, delete_time, float64(upload_count)/delete_time)) + } + + // All done + fmt.Println("Benchmark completed.") +} diff --git a/s3-benchmark.ubuntu b/s3-benchmark.ubuntu new file mode 100755 index 0000000..ff47bdf Binary files /dev/null and b/s3-benchmark.ubuntu differ