s3-benchmark: Add maximum object count setting.
Signed-off-by: Mark Nelson <mnelson@redhat.com>master
parent
c709ee84e7
commit
60225d82d7
|
@ -36,10 +36,10 @@ import (
|
|||
var access_key, secret_key, url_host, bucket_prefix, region, sizeArg string
|
||||
var buckets []string
|
||||
var duration_secs, threads, loops, bucket_count int
|
||||
var object_size uint64
|
||||
var object_data []byte
|
||||
var object_data_md5 string
|
||||
var running_threads, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int32
|
||||
var object_size uint64
|
||||
var running_threads, object_count, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int64
|
||||
var endtime, upload_finish, download_finish, delete_finish time.Time
|
||||
|
||||
func logit(msg string) {
|
||||
|
@ -185,8 +185,15 @@ func runUpload(thread_num int, keys *sync.Map) {
|
|||
bucket_num := thread_num % bucket_count
|
||||
errcnt := 0
|
||||
svc := s3.New(session.New(), cfg)
|
||||
for time.Now().Before(endtime) {
|
||||
objnum := atomic.AddInt32(&upload_count, 1)
|
||||
for {
|
||||
if duration_secs > -1 && time.Now().After(endtime) {
|
||||
break
|
||||
}
|
||||
objnum := atomic.AddInt64(&upload_count, 1)
|
||||
if object_count > -1 && objnum > object_count {
|
||||
objnum = atomic.AddInt64(&upload_count, -1)
|
||||
break
|
||||
}
|
||||
fileobj := bytes.NewReader(object_data)
|
||||
//prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, buckets[bucket_num], objnum)
|
||||
|
||||
|
@ -203,8 +210,8 @@ func runUpload(thread_num int, keys *sync.Map) {
|
|||
err := req.Send()
|
||||
if err != nil {
|
||||
errcnt++
|
||||
atomic.AddInt32(&upload_slowdown_count, 1)
|
||||
atomic.AddInt32(&upload_count, -1)
|
||||
atomic.AddInt64(&upload_slowdown_count, 1)
|
||||
atomic.AddInt64(&upload_count, -1)
|
||||
fmt.Println("upload err", err)
|
||||
//break
|
||||
}
|
||||
|
@ -217,7 +224,7 @@ func runUpload(thread_num int, keys *sync.Map) {
|
|||
// Remember last done time
|
||||
upload_finish = time.Now()
|
||||
// One less thread
|
||||
atomic.AddInt32(&running_threads, -1)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
func runDownload(thread_num int, keys *sync.Map) {
|
||||
|
@ -226,7 +233,7 @@ func runDownload(thread_num int, keys *sync.Map) {
|
|||
svc := s3.New(session.New(), cfg)
|
||||
|
||||
keys.Range(func(k, value interface{}) bool {
|
||||
if time.Now().After(endtime) {
|
||||
if duration_secs > -1 && time.Now().After(endtime) {
|
||||
// fmt.Println("time ended for download")
|
||||
return false
|
||||
}
|
||||
|
@ -247,8 +254,8 @@ func runDownload(thread_num int, keys *sync.Map) {
|
|||
err := req.Send()
|
||||
if err != nil {
|
||||
errcnt++
|
||||
atomic.AddInt32(&download_slowdown_count, 1)
|
||||
atomic.AddInt32(&download_count, -1)
|
||||
atomic.AddInt64(&download_slowdown_count, 1)
|
||||
atomic.AddInt64(&download_count, -1)
|
||||
fmt.Println("download err", err)
|
||||
//break
|
||||
}
|
||||
|
@ -258,14 +265,14 @@ func runDownload(thread_num int, keys *sync.Map) {
|
|||
if errcnt > 2 {
|
||||
return false
|
||||
}
|
||||
atomic.AddInt32(&download_count, 1)
|
||||
atomic.AddInt64(&download_count, 1)
|
||||
return true
|
||||
})
|
||||
|
||||
// Remember last done time
|
||||
download_finish = time.Now()
|
||||
// One less thread
|
||||
atomic.AddInt32(&running_threads, -1)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
func runDelete(thread_num int, keys *sync.Map) {
|
||||
|
@ -291,8 +298,8 @@ func runDelete(thread_num int, keys *sync.Map) {
|
|||
err := req.Send()
|
||||
if err != nil {
|
||||
errcnt++
|
||||
atomic.AddInt32(&delete_slowdown_count, 1)
|
||||
atomic.AddInt32(&delete_count, -1)
|
||||
atomic.AddInt64(&delete_slowdown_count, 1)
|
||||
atomic.AddInt64(&delete_count, -1)
|
||||
fmt.Println("download err", err, "out", out.String())
|
||||
//break
|
||||
}
|
||||
|
@ -306,7 +313,7 @@ func runDelete(thread_num int, keys *sync.Map) {
|
|||
// Remember last done time
|
||||
delete_finish = time.Now()
|
||||
// One less thread
|
||||
atomic.AddInt32(&running_threads, -1)
|
||||
atomic.AddInt64(&running_threads, -1)
|
||||
}
|
||||
|
||||
var cfg *aws.Config
|
||||
|
@ -320,7 +327,8 @@ func init() {
|
|||
myflag.StringVar(&bucket_prefix, "p", "hotsauce_benchmark", "Prefix for buckets")
|
||||
myflag.IntVar(&bucket_count, "b", 1, "Number of buckets to distribute IOs across")
|
||||
myflag.StringVar(®ion, "r", "us-east-1", "Region for testing")
|
||||
myflag.IntVar(&duration_secs, "d", 60, "Duration of each test in seconds")
|
||||
myflag.Int64Var(&object_count, "n", -1, "Maximum number of objects <-1 for unlimited>")
|
||||
myflag.IntVar(&duration_secs, "d", 60, "Maximum test duration in seconds <-1 for unlimited>")
|
||||
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
||||
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
||||
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
|
||||
|
@ -329,6 +337,9 @@ func init() {
|
|||
}
|
||||
|
||||
// Check the arguments
|
||||
if object_count < 0 && duration_secs < 0 {
|
||||
log.Fatal("The number of objects and duration can not both be unlimited")
|
||||
}
|
||||
if access_key == "" {
|
||||
log.Fatal("Missing argument -a for access key.")
|
||||
}
|
||||
|
@ -391,7 +402,7 @@ func main() {
|
|||
|
||||
var keys []*sync.Map
|
||||
// Run the upload case
|
||||
running_threads = int32(threads)
|
||||
running_threads = int64(threads)
|
||||
starttime := time.Now()
|
||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
||||
for n := 0; n < threads; n++ {
|
||||
|
@ -400,7 +411,7 @@ func main() {
|
|||
}
|
||||
|
||||
// Wait for it to finish
|
||||
for atomic.LoadInt32(&running_threads) > 0 {
|
||||
for atomic.LoadInt64(&running_threads) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
upload_time := upload_finish.Sub(starttime).Seconds()
|
||||
|
@ -412,7 +423,7 @@ func main() {
|
|||
uploadspeed = bps / bytefmt.MEGABYTE
|
||||
|
||||
// Run the download case
|
||||
running_threads = int32(threads)
|
||||
running_threads = int64(threads)
|
||||
starttime = time.Now()
|
||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
||||
for n := 0; n < threads; n++ {
|
||||
|
@ -420,7 +431,7 @@ func main() {
|
|||
}
|
||||
|
||||
// Wait for it to finish
|
||||
for atomic.LoadInt32(&running_threads) > 0 {
|
||||
for atomic.LoadInt64(&running_threads) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
download_time := download_finish.Sub(starttime).Seconds()
|
||||
|
@ -432,7 +443,7 @@ func main() {
|
|||
downloadspeed = bps / bytefmt.MEGABYTE
|
||||
|
||||
// Run the delete case
|
||||
running_threads = int32(threads)
|
||||
running_threads = int64(threads)
|
||||
starttime = time.Now()
|
||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
||||
for n := 0; n < threads; n++ {
|
||||
|
@ -440,7 +451,7 @@ func main() {
|
|||
}
|
||||
|
||||
// Wait for it to finish
|
||||
for atomic.LoadInt32(&running_threads) > 0 {
|
||||
for atomic.LoadInt64(&running_threads) > 0 {
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
delete_time := delete_finish.Sub(starttime).Seconds()
|
||||
|
|
Loading…
Reference in New Issue