Merge pull request #4 from markhpc/wip-shard-objects-buckets
s3-benchmark: shard objects across buckets rather than threads.master
commit
7f3aa829d3
139
s3-benchmark.go
139
s3-benchmark.go
|
@ -21,7 +21,6 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -181,8 +180,8 @@ func setSignature(req *http.Request) {
|
||||||
req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature))
|
req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runUpload(thread_num int, keys *sync.Map) {
|
func runUpload(thread_num int) {
|
||||||
bucket_num := thread_num % bucket_count
|
// bucket_num := thread_num % bucket_count
|
||||||
errcnt := 0
|
errcnt := 0
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
for {
|
for {
|
||||||
|
@ -190,6 +189,7 @@ func runUpload(thread_num int, keys *sync.Map) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
objnum := atomic.AddInt64(&upload_count, 1)
|
objnum := atomic.AddInt64(&upload_count, 1)
|
||||||
|
bucket_num := objnum % int64(bucket_count)
|
||||||
if object_count > -1 && objnum > object_count {
|
if object_count > -1 && objnum > object_count {
|
||||||
objnum = atomic.AddInt64(&upload_count, -1)
|
objnum = atomic.AddInt64(&upload_count, -1)
|
||||||
break
|
break
|
||||||
|
@ -218,7 +218,6 @@ func runUpload(thread_num int, keys *sync.Map) {
|
||||||
if errcnt > 2 {
|
if errcnt > 2 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
keys.Store(key, nil)
|
|
||||||
fmt.Fprintf(os.Stderr, "upload thread %v, %v\r", thread_num, key)
|
fmt.Fprintf(os.Stderr, "upload thread %v, %v\r", thread_num, key)
|
||||||
}
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
|
@ -227,89 +226,83 @@ func runUpload(thread_num int, keys *sync.Map) {
|
||||||
atomic.AddInt64(&running_threads, -1)
|
atomic.AddInt64(&running_threads, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runDownload(thread_num int, keys *sync.Map) {
|
func runDownload(thread_num int) {
|
||||||
bucket_num := thread_num % bucket_count
|
|
||||||
errcnt := 0
|
errcnt := 0
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
|
for {
|
||||||
keys.Range(func(k, value interface{}) bool {
|
|
||||||
if duration_secs > -1 && time.Now().After(endtime) {
|
if duration_secs > -1 && time.Now().After(endtime) {
|
||||||
// fmt.Println("time ended for download")
|
break
|
||||||
return false
|
}
|
||||||
}
|
|
||||||
var key string
|
|
||||||
var ok bool
|
|
||||||
if key, ok = k.(string); !ok {
|
|
||||||
log.Fatal("convert key back error")
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key)
|
objnum := atomic.AddInt64(&download_count, 1)
|
||||||
|
if objnum > object_count {
|
||||||
r := &s3.GetObjectInput{
|
|
||||||
Bucket: &buckets[bucket_num],
|
|
||||||
Key: &key,
|
|
||||||
}
|
|
||||||
|
|
||||||
req, resp := svc.GetObjectRequest(r)
|
|
||||||
err := req.Send()
|
|
||||||
if err != nil {
|
|
||||||
errcnt++
|
|
||||||
atomic.AddInt64(&download_slowdown_count, 1)
|
|
||||||
atomic.AddInt64(&download_count, -1)
|
atomic.AddInt64(&download_count, -1)
|
||||||
fmt.Println("download err", err)
|
break
|
||||||
//break
|
}
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
_, err = io.Copy(ioutil.Discard, resp.Body)
|
|
||||||
}
|
|
||||||
if errcnt > 2 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
atomic.AddInt64(&download_count, 1)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
|
bucket_num := objnum % int64(bucket_count)
|
||||||
|
key := fmt.Sprintf("Object-%d", objnum)
|
||||||
|
fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key)
|
||||||
|
r := &s3.GetObjectInput{
|
||||||
|
Bucket: &buckets[bucket_num],
|
||||||
|
Key: &key,
|
||||||
|
}
|
||||||
|
|
||||||
|
req, resp := svc.GetObjectRequest(r)
|
||||||
|
err := req.Send()
|
||||||
|
if err != nil {
|
||||||
|
errcnt++
|
||||||
|
atomic.AddInt64(&download_slowdown_count, 1)
|
||||||
|
atomic.AddInt64(&download_count, -1)
|
||||||
|
fmt.Println("download err", err)
|
||||||
|
//break
|
||||||
|
}
|
||||||
|
if err == nil {
|
||||||
|
_, err = io.Copy(ioutil.Discard, resp.Body)
|
||||||
|
}
|
||||||
|
if errcnt > 2 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
download_finish = time.Now()
|
download_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
atomic.AddInt64(&running_threads, -1)
|
atomic.AddInt64(&running_threads, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runDelete(thread_num int, keys *sync.Map) {
|
func runDelete(thread_num int) {
|
||||||
bucket_num := thread_num % bucket_count
|
|
||||||
errcnt := 0
|
errcnt := 0
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
|
|
||||||
keys.Range(func(k, value interface{}) bool {
|
for {
|
||||||
var key string
|
objnum := atomic.AddInt64(&delete_count, 1)
|
||||||
var ok bool
|
if objnum > object_count {
|
||||||
if key, ok = k.(string); !ok {
|
atomic.AddInt64(&delete_count, -1)
|
||||||
log.Fatal("convert key back error")
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
bucket_num := objnum % int64(bucket_count)
|
||||||
|
|
||||||
|
key := fmt.Sprintf("Object-%d", objnum)
|
||||||
|
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
|
||||||
|
r := &s3.DeleteObjectInput{
|
||||||
|
Bucket: &buckets[bucket_num],
|
||||||
|
Key: &key,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
req, out := svc.DeleteObjectRequest(r)
|
||||||
|
err := req.Send()
|
||||||
|
if err != nil {
|
||||||
|
errcnt++
|
||||||
|
atomic.AddInt64(&delete_slowdown_count, 1)
|
||||||
|
atomic.AddInt64(&delete_count, -1)
|
||||||
|
fmt.Println("delete err", err, "out", out.String())
|
||||||
|
}
|
||||||
|
if errcnt > 2 {
|
||||||
|
break
|
||||||
|
}
|
||||||
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
|
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
|
||||||
|
}
|
||||||
r := &s3.DeleteObjectInput{
|
|
||||||
Bucket: &buckets[bucket_num],
|
|
||||||
Key: &key,
|
|
||||||
}
|
|
||||||
|
|
||||||
req, out := svc.DeleteObjectRequest(r)
|
|
||||||
err := req.Send()
|
|
||||||
if err != nil {
|
|
||||||
errcnt++
|
|
||||||
atomic.AddInt64(&delete_slowdown_count, 1)
|
|
||||||
atomic.AddInt64(&delete_count, -1)
|
|
||||||
fmt.Println("download err", err, "out", out.String())
|
|
||||||
//break
|
|
||||||
}
|
|
||||||
if errcnt > 2 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
delete_finish = time.Now()
|
delete_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
|
@ -400,14 +393,12 @@ func main() {
|
||||||
delete_count = 0
|
delete_count = 0
|
||||||
delete_slowdown_count = 0
|
delete_slowdown_count = 0
|
||||||
|
|
||||||
var keys []*sync.Map
|
|
||||||
// Run the upload case
|
// Run the upload case
|
||||||
running_threads = int64(threads)
|
running_threads = int64(threads)
|
||||||
starttime := time.Now()
|
starttime := time.Now()
|
||||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
||||||
for n := 0; n < threads; n++ {
|
for n := 0; n < threads; n++ {
|
||||||
keys = append(keys, &sync.Map{})
|
go runUpload(n)
|
||||||
go runUpload(n, keys[n])
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for it to finish
|
// Wait for it to finish
|
||||||
|
@ -427,7 +418,7 @@ func main() {
|
||||||
starttime = time.Now()
|
starttime = time.Now()
|
||||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
||||||
for n := 0; n < threads; n++ {
|
for n := 0; n < threads; n++ {
|
||||||
go runDownload(n, keys[n])
|
go runDownload(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for it to finish
|
// Wait for it to finish
|
||||||
|
@ -447,7 +438,7 @@ func main() {
|
||||||
starttime = time.Now()
|
starttime = time.Now()
|
||||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
||||||
for n := 0; n < threads; n++ {
|
for n := 0; n < threads; n++ {
|
||||||
go runDelete(n, keys[n])
|
go runDelete(n)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for it to finish
|
// Wait for it to finish
|
||||||
|
|
Loading…
Reference in New Issue