Merge pull request #3 from markhpc/wip-object-time-count

s3-benchmark: Add maximum object count setting.
master
Mark Nelson 2019-08-11 19:52:39 -05:00 committed by GitHub
commit 32a379a38d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 33 additions and 22 deletions

View File

@ -36,10 +36,10 @@ import (
var access_key, secret_key, url_host, bucket_prefix, region, sizeArg string
var buckets []string
var duration_secs, threads, loops, bucket_count int
var object_size uint64
var object_data []byte
var object_data_md5 string
var running_threads, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int32
var object_size uint64
var running_threads, object_count, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int64
var endtime, upload_finish, download_finish, delete_finish time.Time
func logit(msg string) {
@ -185,8 +185,15 @@ func runUpload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
errcnt := 0
svc := s3.New(session.New(), cfg)
for time.Now().Before(endtime) {
objnum := atomic.AddInt32(&upload_count, 1)
for {
if duration_secs > -1 && time.Now().After(endtime) {
break
}
objnum := atomic.AddInt64(&upload_count, 1)
if object_count > -1 && objnum > object_count {
objnum = atomic.AddInt64(&upload_count, -1)
break
}
fileobj := bytes.NewReader(object_data)
//prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, buckets[bucket_num], objnum)
@ -203,8 +210,8 @@ func runUpload(thread_num int, keys *sync.Map) {
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt32(&upload_slowdown_count, 1)
atomic.AddInt32(&upload_count, -1)
atomic.AddInt64(&upload_slowdown_count, 1)
atomic.AddInt64(&upload_count, -1)
fmt.Println("upload err", err)
//break
}
@ -217,7 +224,7 @@ func runUpload(thread_num int, keys *sync.Map) {
// Remember last done time
upload_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
atomic.AddInt64(&running_threads, -1)
}
func runDownload(thread_num int, keys *sync.Map) {
@ -226,7 +233,7 @@ func runDownload(thread_num int, keys *sync.Map) {
svc := s3.New(session.New(), cfg)
keys.Range(func(k, value interface{}) bool {
if time.Now().After(endtime) {
if duration_secs > -1 && time.Now().After(endtime) {
// fmt.Println("time ended for download")
return false
}
@ -247,8 +254,8 @@ func runDownload(thread_num int, keys *sync.Map) {
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt32(&download_slowdown_count, 1)
atomic.AddInt32(&download_count, -1)
atomic.AddInt64(&download_slowdown_count, 1)
atomic.AddInt64(&download_count, -1)
fmt.Println("download err", err)
//break
}
@ -258,14 +265,14 @@ func runDownload(thread_num int, keys *sync.Map) {
if errcnt > 2 {
return false
}
atomic.AddInt32(&download_count, 1)
atomic.AddInt64(&download_count, 1)
return true
})
// Remember last done time
download_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
atomic.AddInt64(&running_threads, -1)
}
func runDelete(thread_num int, keys *sync.Map) {
@ -291,8 +298,8 @@ func runDelete(thread_num int, keys *sync.Map) {
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt32(&delete_slowdown_count, 1)
atomic.AddInt32(&delete_count, -1)
atomic.AddInt64(&delete_slowdown_count, 1)
atomic.AddInt64(&delete_count, -1)
fmt.Println("download err", err, "out", out.String())
//break
}
@ -306,7 +313,7 @@ func runDelete(thread_num int, keys *sync.Map) {
// Remember last done time
delete_finish = time.Now()
// One less thread
atomic.AddInt32(&running_threads, -1)
atomic.AddInt64(&running_threads, -1)
}
var cfg *aws.Config
@ -320,7 +327,8 @@ func init() {
myflag.StringVar(&bucket_prefix, "p", "hotsauce_benchmark", "Prefix for buckets")
myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
myflag.StringVar(&region, "r", "us-east-1", "Region for testing")
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
@ -329,6 +337,9 @@ func init() {
}
// Check the arguments
if object_count < 0 && duration_secs < 0 {
log.Fatal("The number of objects and duration can not both be unlimited")
}
if access_key == "" {
log.Fatal("Missing argument -a for access key.")
}
@ -391,7 +402,7 @@ func main() {
var keys []*sync.Map
// Run the upload case
running_threads = int32(threads)
running_threads = int64(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
@ -400,7 +411,7 @@ func main() {
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
upload_time := upload_finish.Sub(starttime).Seconds()
@ -412,7 +423,7 @@ func main() {
uploadspeed = bps / bytefmt.MEGABYTE
// Run the download case
running_threads = int32(threads)
running_threads = int64(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
@ -420,7 +431,7 @@ func main() {
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
download_time := download_finish.Sub(starttime).Seconds()
@ -432,7 +443,7 @@ func main() {
downloadspeed = bps / bytefmt.MEGABYTE
// Run the delete case
running_threads = int32(threads)
running_threads = int64(threads)
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
@ -440,7 +451,7 @@ func main() {
}
// Wait for it to finish
for atomic.LoadInt32(&running_threads) > 0 {
for atomic.LoadInt64(&running_threads) > 0 {
time.Sleep(time.Millisecond)
}
delete_time := delete_finish.Sub(starttime).Seconds()