s3-benchmark: Implement multiple buckets.

Signed-off-by: Mark Nelson <mnelson@redhat.com>
master
Mark Nelson 2019-08-11 19:26:21 -04:00
parent 0c89e9a7a1
commit 389e714111
1 changed files with 62 additions and 39 deletions

View File

@ -33,8 +33,9 @@ import (
)
// Global variables
var access_key, secret_key, url_host, bucket, region, sizeArg string
var duration_secs, threads, loops int
var access_key, secret_key, url_host, bucket_prefix, region, sizeArg string
var buckets []string
var duration_secs, threads, loops, bucket_count int
var object_size uint64
var object_data []byte
var object_data_md5 string
@ -94,34 +95,35 @@ func getS3Client() *s3.S3 {
return client
}
func createBucket(ignore_errors bool) {
func createBucket(bucket_num int, ignore_errors bool) {
// Get a client
// client := getS3Client()
// Create our bucket (may already exist without error)
svc := s3.New(session.New(), cfg)
in := &s3.CreateBucketInput{Bucket: aws.String(bucket)}
log.Printf(buckets[bucket_num])
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
if _, err := svc.CreateBucket(in); err != nil {
if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) ||
strings.Contains(err.Error(), "BucketAlreadyExists") {
return
}
if ignore_errors {
log.Printf("WARNING: createBucket %s error, ignoring %v", bucket, err)
log.Printf("WARNING: createBucket %s error, ignoring %v", buckets[bucket_num], err)
} else {
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", bucket, err)
log.Fatalf("FATAL: Unable to create bucket %s (is your access and secret correct?): %v", buckets[bucket_num], err)
}
}
}
func deleteAllObjects() {
func deleteAllObjects(bucket_num int) {
// Get a client
// client := getS3Client()
svc := s3.New(session.New(), cfg)
// in := &s3.DeleteBucketInput{Bucket: aws.String(bucket)}
// in := &s3.DeleteBucketInput{Bucket: aws.String(buckets[bucket_num])}
// if _, err := svc.DeleteBucket(in); err != nil {
// log.Printf("FATAL: Unable to delete bucket %s : %v", bucket, err)
// log.Printf("FATAL: Unable to delete bucket %s : %v", buckets[bucket_num], err)
// }
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket})
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
log.Fatal("can't list objects")
}
@ -133,11 +135,11 @@ func deleteAllObjects() {
for _, v := range out.Contents {
svc.DeleteObject(&s3.DeleteObjectInput{
Bucket: &bucket,
Bucket: &buckets[bucket_num],
Key: v.Key,
})
}
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &bucket})
out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
if err != nil {
log.Fatal("can't list objects")
}
@ -239,16 +241,17 @@ func setSignature(req *http.Request) {
}
func runUpload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0
svc := s3.New(session.New(), cfg)
for time.Now().Before(endtime) {
objnum := atomic.AddInt32(&upload_count, 1)
fileobj := bytes.NewReader(object_data)
//prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
//prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, buckets[bucket_num], objnum)
key := fmt.Sprintf("Object-%d", objnum)
r := &s3.PutObjectInput{
Bucket: &bucket,
Bucket: &buckets[bucket_num],
Key: &key,
Body: fileobj,
}
@ -297,6 +300,7 @@ func runUpload(thread_num int, keys *sync.Map) {
}
func runDownload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0
svc := s3.New(session.New(), cfg)
@ -321,7 +325,7 @@ func runDownload(thread_num int, keys *sync.Map) {
fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key)
r := &s3.GetObjectInput{
Bucket: &bucket,
Bucket: &buckets[bucket_num],
Key: &key,
}
@ -365,17 +369,29 @@ func runDownload(thread_num int, keys *sync.Map) {
atomic.AddInt32(&running_threads, -1)
}
func runDelete(thread_num int) {
func runDelete(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0
svc := s3.New(session.New(), cfg)
for {
objnum := atomic.AddInt32(&delete_count, 1)
if objnum > upload_count {
break
}
key := fmt.Sprintf("Object-%d", objnum)
keys.Range(func(k, value interface{}) bool {
// objnum := atomic.AddInt32(&delete_count, 1)
// if objnum > upload_count {
// break
// }
// key := fmt.Sprintf("Object-%d", objnum)
var key string
var ok bool
if key, ok = k.(string); !ok {
log.Fatal("convert key back error")
}
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
r := &s3.DeleteObjectInput{
Bucket: &bucket,
Bucket: &buckets[bucket_num],
Key: &key,
}
@ -389,7 +405,7 @@ func runDelete(thread_num int) {
//break
}
if errcnt > 2 {
break
return false
}
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
@ -402,7 +418,10 @@ func runDelete(thread_num int) {
// atomic.AddInt32(&delete_slowdown_count, 1)
// atomic.AddInt32(&delete_count, -1)
// }
}
return true
})
// Remember last done time
delete_finish = time.Now()
// One less thread
@ -417,7 +436,8 @@ func init() {
myflag.StringVar(&access_key, "a", os.Getenv("AWS_ACCESS_KEY_ID"), "Access key")
myflag.StringVar(&secret_key, "s", os.Getenv("AWS_SECRET_ACCESS_KEY"), "Secret key")
myflag.StringVar(&url_host, "u", os.Getenv("AWS_HOST"), "URL for host with method prefix")
myflag.StringVar(&bucket, "b", "loadgen", "Bucket for testing")
myflag.StringVar(&bucket_prefix, "p", "hotsauce_benchmark", "Prefix for buckets")
myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
@ -458,8 +478,8 @@ func main() {
}
// Echo the parameters
logit(fmt.Sprintf("Parameters: url=%s, bucket=%s, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
url_host, bucket, region, duration_secs, threads, loops, sizeArg))
logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg))
// Initialize data for the bucket
object_data = make([]byte, object_size)
@ -468,9 +488,12 @@ func main() {
hasher.Write(object_data)
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
// Create the bucket and delete all the objects
createBucket(true)
deleteAllObjects()
// Create the buckets and delete all the objects
for i := 0; i < bucket_count; i++ {
buckets = append(buckets, fmt.Sprintf("%s-%d", bucket_prefix, i))
createBucket(i, true)
deleteAllObjects(i)
}
var uploadspeed, downloadspeed float64
@ -485,14 +508,14 @@ func main() {
delete_count = 0
delete_slowdown_count = 0
keys := &sync.Map{}
var keys []*sync.Map
// Run the upload case
running_threads = int32(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runUpload(n, keys)
for n := 0; n < threads; n++ {
keys = append(keys, &sync.Map{})
go runUpload(n, keys[n])
}
// Wait for it to finish
@ -517,8 +540,8 @@ func main() {
running_threads = int32(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runDownload(n, keys)
for n := 0; n < threads; n++ {
go runDownload(n, keys[n])
}
// Wait for it to finish
@ -537,8 +560,8 @@ func main() {
running_threads = int32(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 1; n <= threads; n++ {
go runDelete(n)
for n := 0; n < threads; n++ {
go runDelete(n, keys[n])
}
// Wait for it to finish