s3-benchmark: shard objects across buckets rather than threads.

Signed-off-by: Mark Nelson <mnelson@redhat.com>
master
Mark Nelson 2019-08-12 08:54:02 -04:00
parent 60225d82d7
commit b7d8e42b3a
1 changed files with 65 additions and 74 deletions

View File

@ -21,7 +21,6 @@ import (
"os"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@ -181,8 +180,8 @@ func setSignature(req *http.Request) {
req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature))
}
func runUpload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
func runUpload(thread_num int) {
// bucket_num := thread_num % bucket_count
errcnt := 0
svc := s3.New(session.New(), cfg)
for {
@ -190,6 +189,7 @@ func runUpload(thread_num int, keys *sync.Map) {
break
}
objnum := atomic.AddInt64(&upload_count, 1)
bucket_num := objnum % int64(bucket_count)
if object_count > -1 && objnum > object_count {
objnum = atomic.AddInt64(&upload_count, -1)
break
@ -218,7 +218,6 @@ func runUpload(thread_num int, keys *sync.Map) {
if errcnt > 2 {
break
}
keys.Store(key, nil)
fmt.Fprintf(os.Stderr, "upload thread %v, %v\r", thread_num, key)
}
// Remember last done time
@ -227,89 +226,83 @@ func runUpload(thread_num int, keys *sync.Map) {
atomic.AddInt64(&running_threads, -1)
}
func runDownload(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
func runDownload(thread_num int) {
errcnt := 0
svc := s3.New(session.New(), cfg)
keys.Range(func(k, value interface{}) bool {
for {
if duration_secs > -1 && time.Now().After(endtime) {
// fmt.Println("time ended for download")
return false
}
var key string
var ok bool
if key, ok = k.(string); !ok {
log.Fatal("convert key back error")
}
break
}
fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key)
r := &s3.GetObjectInput{
Bucket: &buckets[bucket_num],
Key: &key,
}
req, resp := svc.GetObjectRequest(r)
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt64(&download_slowdown_count, 1)
objnum := atomic.AddInt64(&download_count, 1)
if objnum > object_count {
atomic.AddInt64(&download_count, -1)
fmt.Println("download err", err)
//break
}
if err == nil {
_, err = io.Copy(ioutil.Discard, resp.Body)
}
if errcnt > 2 {
return false
}
atomic.AddInt64(&download_count, 1)
return true
})
break
}
bucket_num := objnum % int64(bucket_count)
key := fmt.Sprintf("Object-%d", objnum)
fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key)
r := &s3.GetObjectInput{
Bucket: &buckets[bucket_num],
Key: &key,
}
req, resp := svc.GetObjectRequest(r)
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt64(&download_slowdown_count, 1)
atomic.AddInt64(&download_count, -1)
fmt.Println("download err", err)
//break
}
if err == nil {
_, err = io.Copy(ioutil.Discard, resp.Body)
}
if errcnt > 2 {
break
}
}
// Remember last done time
download_finish = time.Now()
// One less thread
atomic.AddInt64(&running_threads, -1)
}
func runDelete(thread_num int, keys *sync.Map) {
bucket_num := thread_num % bucket_count
func runDelete(thread_num int) {
errcnt := 0
svc := s3.New(session.New(), cfg)
keys.Range(func(k, value interface{}) bool {
var key string
var ok bool
if key, ok = k.(string); !ok {
log.Fatal("convert key back error")
for {
objnum := atomic.AddInt64(&delete_count, 1)
if objnum > object_count {
atomic.AddInt64(&delete_count, -1)
break
}
bucket_num := objnum % int64(bucket_count)
key := fmt.Sprintf("Object-%d", objnum)
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
r := &s3.DeleteObjectInput{
Bucket: &buckets[bucket_num],
Key: &key,
}
req, out := svc.DeleteObjectRequest(r)
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt64(&delete_slowdown_count, 1)
atomic.AddInt64(&delete_count, -1)
fmt.Println("delete err", err, "out", out.String())
}
if errcnt > 2 {
break
}
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
r := &s3.DeleteObjectInput{
Bucket: &buckets[bucket_num],
Key: &key,
}
req, out := svc.DeleteObjectRequest(r)
err := req.Send()
if err != nil {
errcnt++
atomic.AddInt64(&delete_slowdown_count, 1)
atomic.AddInt64(&delete_count, -1)
fmt.Println("download err", err, "out", out.String())
//break
}
if errcnt > 2 {
return false
}
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
return true
})
}
// Remember last done time
delete_finish = time.Now()
// One less thread
@ -400,14 +393,12 @@ func main() {
delete_count = 0
delete_slowdown_count = 0
var keys []*sync.Map
// Run the upload case
running_threads = int64(threads)
starttime := time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
keys = append(keys, &sync.Map{})
go runUpload(n, keys[n])
go runUpload(n)
}
// Wait for it to finish
@ -427,7 +418,7 @@ func main() {
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runDownload(n, keys[n])
go runDownload(n)
}
// Wait for it to finish
@ -447,7 +438,7 @@ func main() {
starttime = time.Now()
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
for n := 0; n < threads; n++ {
go runDelete(n, keys[n])
go runDelete(n)
}
// Wait for it to finish