Merge pull request #1 from markhpc/wip-multiple-buckets

s3-benchmark: Add multiple Buckets
master
Mark Nelson 2019-08-11 19:44:43 -05:00 committed by GitHub
commit f2fc9a1e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 62 additions and 39 deletions

View File

@ -33,8 +33,9 @@ import (
) )
// Global variables // Global variables
var access_key, secret_key, url_host, bucket, region, sizeArg string var access_key, secret_key, url_host, bucket_prefix, region, sizeArg string
var duration_secs, threads, loops int var buckets []string
var duration_secs, threads, loops, bucket_count int
var object_size uint64 var object_size uint64
var object_data []byte var object_data []byte
var object_data_md5 string var object_data_md5 string
@ -94,34 +95,35 @@ func getS3Client() *s3.S3 {
return client return client
} }
func createBucket(ignore_errors bool) { func createBucket(bucket_num int, ignore_errors bool) {
// Get a client // Get a client
// client := getS3Client() // client := getS3Client()
// Create our bucket (may already exist without error) // Create our bucket (may already exist without error)
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)} log.Printf(buckets[bucket_num])
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
if _, err := svc.CreateBucket(in); err != nil { if _, err := svc.CreateBucket(in); err != nil {
if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) || if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) ||
strings.Contains(err.Error(), "BucketAlreadyExists") { strings.Contains(err.Error(), "BucketAlreadyExists") {
return return
} }
if ignore_errors { if ignore_errors {
log.Printf("WARNING: createBucket %s error, ignoring %v", bucket, err) log.Printf("WARNING: createBucket %s error, ignoring %v", buckets[bucket_num], err)
} else { } else {
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err) log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
} }
} }
} }
func deleteAllObjects() { func deleteAllObjects(bucket_num int) {
// Get a client // Get a client
// client := getS3Client() // client := getS3Client()
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
// in := &s3.DeleteBucketInput{Bucket: aws.String(bucket)} // in := &s3.DeleteBucketInput{Bucket: aws.String(buckets[bucket_num])}
// if _, err := svc.DeleteBucket(in); err != nil { // if _, err := svc.DeleteBucket(in); err != nil {
// log.Printf("FATAL: Unable to delete bucket %s : %v", bucket, err) // log.Printf("FATAL: Unable to delete bucket %s : %v", buckets[bucket_num], err)
// } // }
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil { if err != nil {
log.Fatal("can't list objects") log.Fatal("can't list objects")
} }
@ -133,11 +135,11 @@ func deleteAllObjects() {
for _, v := range out.Contents { for _, v := range out.Contents {
svc.DeleteObject(&s3.DeleteObjectInput{ svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &bucket, Bucket: &buckets[bucket_num],
Key: v.Key, Key: v.Key,
}) })
} }
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket}) out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil { if err != nil {
log.Fatal("can't list objects") log.Fatal("can't list objects")
} }
@ -239,16 +241,17 @@ func setSignature(req *http.Request) {
} }
func runUpload(thread_num int, keys *sync.Map) { func runUpload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0 errcnt := 0
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
for time.Now().Before(endtime) { for time.Now().Before(endtime) {
objnum := atomic.AddInt32(&upload_count, 1) objnum := atomic.AddInt32(&upload_count, 1)
fileobj := bytes.NewReader(object_data) fileobj := bytes.NewReader(object_data)
//prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum) //prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, buckets[bucket_num], objnum)
key := fmt.Sprintf("Object-%d", objnum) key := fmt.Sprintf("Object-%d", objnum)
r := &s3.PutObjectInput{ r := &s3.PutObjectInput{
Bucket: &bucket, Bucket: &buckets[bucket_num],
Key: &key, Key: &key,
Body: fileobj, Body: fileobj,
} }
@ -297,6 +300,7 @@ func runUpload(thread_num int, keys *sync.Map) {
} }
func runDownload(thread_num int, keys *sync.Map) { func runDownload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0 errcnt := 0
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
@ -321,7 +325,7 @@ func runDownload(thread_num int, keys *sync.Map) {
fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key) fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key)
r := &s3.GetObjectInput{ r := &s3.GetObjectInput{
Bucket: &bucket, Bucket: &buckets[bucket_num],
Key: &key, Key: &key,
} }
@ -365,17 +369,29 @@ func runDownload(thread_num int, keys *sync.Map) {
atomic.AddInt32(&running_threads, -1) atomic.AddInt32(&running_threads, -1)
} }
func runDelete(thread_num int) { func runDelete(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0 errcnt := 0
svc := s3.New(session.New(), cfg) svc := s3.New(session.New(), cfg)
for {
objnum := atomic.AddInt32(&delete_count, 1) keys.Range(func(k, value interface{}) bool {
if objnum > upload_count {
break // objnum := atomic.AddInt32(&delete_count, 1)
} // if objnum > upload_count {
key := fmt.Sprintf("Object-%d", objnum) // break
// }
// key := fmt.Sprintf("Object-%d", objnum)
var key string
var ok bool
if key, ok = k.(string); !ok {
log.Fatal("convert key back error")
}
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
r := &s3.DeleteObjectInput{ r := &s3.DeleteObjectInput{
Bucket: &bucket, Bucket: &buckets[bucket_num],
Key: &key, Key: &key,
} }
@ -389,7 +405,7 @@ func runDelete(thread_num int) {
//break //break
} }
if errcnt > 2 { if errcnt > 2 {
break return false
} }
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key) fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
@ -402,7 +418,10 @@ func runDelete(thread_num int) {
// atomic.AddInt32(&delete_slowdown_count, 1) // atomic.AddInt32(&delete_slowdown_count, 1)
// atomic.AddInt32(&delete_count, -1) // atomic.AddInt32(&delete_count, -1)
// } // }
}
return true
})
// Remember last done time // Remember last done time
delete_finish = time.Now() delete_finish = time.Now()
// One less thread // One less thread
@ -417,7 +436,8 @@ func init() {
myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key") myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key")
myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key") myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key")
myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix") myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix")
myflag.StringVar(&bucket, "b", "loadgen", "Bucket for testing") myflag.StringVar(&bucket_prefix, "p", "hotsauce_benchmark", "Prefix for buckets")
myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.StringVar(&region, "r", "us-east-1", "Region for testing") myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds") myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
myflag.IntVar(&threads, "t", 1, "Number of threads to run") myflag.IntVar(&threads, "t", 1, "Number of threads to run")
@ -458,8 +478,8 @@ func main() {
} }
// Echo the parameters // Echo the parameters
logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, region=%s, duration=%d, threads=%d, loops=%d, size=%s", logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
url_host, bucket, region, duration_secs, threads, loops, sizeArg)) url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg))
// Initialize data for the bucket // Initialize data for the bucket
object_data = make([]byte, object_size) object_data = make([]byte, object_size)
@ -468,9 +488,12 @@ func main() {
hasher.Write(object_data) hasher.Write(object_data)
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil)) object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
// Create the bucket and delete all the objects // Create the buckets and delete all the objects
createBucket(true) for i := 0; i < bucket_count; i++ {
deleteAllObjects() buckets = append(buckets, fmt.Sprintf("%s-%d", bucket_prefix, i))
createBucket(i, true)
deleteAllObjects(i)
}
var uploadspeed, downloadspeed float64 var uploadspeed, downloadspeed float64
@ -485,14 +508,14 @@ func main() {
delete_count = 0 delete_count = 0
delete_slowdown_count = 0 delete_slowdown_count = 0
keys := &sync.Map{} var keys []*sync.Map
// Run the upload case // Run the upload case
running_threads = int32(threads) running_threads = int32(threads)
starttime := time.Now() starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs)) endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ { for n := 0; n < threads; n++ {
go runUpload(n, keys) keys = append(keys, &sync.Map{})
go runUpload(n, keys[n])
} }
// Wait for it to finish // Wait for it to finish
@ -517,8 +540,8 @@ func main() {
running_threads = int32(threads) running_threads = int32(threads)
starttime = time.Now() starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs)) endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ { for n := 0; n < threads; n++ {
go runDownload(n, keys) go runDownload(n, keys[n])
} }
// Wait for it to finish // Wait for it to finish
@ -537,8 +560,8 @@ func main() {
running_threads = int32(threads) running_threads = int32(threads)
starttime = time.Now() starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs)) endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ { for n := 0; n < threads; n++ {
go runDelete(n) go runDelete(n, keys[n])
} }
// Wait for it to finish // Wait for it to finish