From b7d8e42b3a25028baf16beca9862b3e8c519f947 Mon Sep 17 00:00:00 2001 From: Mark Nelson Date: Mon, 12 Aug 2019 08:54:02 -0400 Subject: [PATCH] s3-benchmark: shard objects across buckets rather than threads. Signed-off-by: Mark Nelson --- s3-benchmark.go | 139 ++++++++++++++++++++++-------------------------- 1 file changed, 65 insertions(+), 74 deletions(-) diff --git a/s3-benchmark.go b/s3-benchmark.go index e1dd05c..5bc46c8 100644 --- a/s3-benchmark.go +++ b/s3-benchmark.go @@ -21,7 +21,6 @@ import ( "os" "sort" "strings" - "sync" "sync/atomic" "time" @@ -181,8 +180,8 @@ func setSignature(req *http.Request) { req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature)) } -func runUpload(thread_num int, keys *sync.Map) { - bucket_num := thread_num % bucket_count +func runUpload(thread_num int) { +// bucket_num := thread_num % bucket_count errcnt := 0 svc := s3.New(session.New(), cfg) for { @@ -190,6 +189,7 @@ func runUpload(thread_num int, keys *sync.Map) { break } objnum := atomic.AddInt64(&upload_count, 1) + bucket_num := objnum % int64(bucket_count) if object_count > -1 && objnum > object_count { objnum = atomic.AddInt64(&upload_count, -1) break @@ -218,7 +218,6 @@ func runUpload(thread_num int, keys *sync.Map) { if errcnt > 2 { break } - keys.Store(key, nil) fmt.Fprintf(os.Stderr, "upload thread %v, %v\r", thread_num, key) } // Remember last done time @@ -227,89 +226,83 @@ func runUpload(thread_num int, keys *sync.Map) { atomic.AddInt64(&running_threads, -1) } -func runDownload(thread_num int, keys *sync.Map) { - bucket_num := thread_num % bucket_count +func runDownload(thread_num int) { errcnt := 0 svc := s3.New(session.New(), cfg) - - keys.Range(func(k, value interface{}) bool { + for { if duration_secs > -1 && time.Now().After(endtime) { - // fmt.Println("time ended for download") - return false - } - var key string - var ok bool - if key, ok = k.(string); !ok { - log.Fatal("convert key back error") - } + break + } - fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key) - - r := &s3.GetObjectInput{ - Bucket: &buckets[bucket_num], - Key: &key, - } - - req, resp := svc.GetObjectRequest(r) - err := req.Send() - if err != nil { - errcnt++ - atomic.AddInt64(&download_slowdown_count, 1) + objnum := atomic.AddInt64(&download_count, 1) + if objnum > object_count { atomic.AddInt64(&download_count, -1) - fmt.Println("download err", err) - //break - } - if err == nil { - _, err = io.Copy(ioutil.Discard, resp.Body) - } - if errcnt > 2 { - return false - } - atomic.AddInt64(&download_count, 1) - return true - }) + break + } + bucket_num := objnum % int64(bucket_count) + key := fmt.Sprintf("Object-%d", objnum) + fmt.Fprintf(os.Stderr, "download thread %v, %v\r", thread_num, key) + r := &s3.GetObjectInput{ + Bucket: &buckets[bucket_num], + Key: &key, + } + + req, resp := svc.GetObjectRequest(r) + err := req.Send() + if err != nil { + errcnt++ + atomic.AddInt64(&download_slowdown_count, 1) + atomic.AddInt64(&download_count, -1) + fmt.Println("download err", err) + //break + } + if err == nil { + _, err = io.Copy(ioutil.Discard, resp.Body) + } + if errcnt > 2 { + break + } + } // Remember last done time download_finish = time.Now() // One less thread atomic.AddInt64(&running_threads, -1) } -func runDelete(thread_num int, keys *sync.Map) { - bucket_num := thread_num % bucket_count +func runDelete(thread_num int) { errcnt := 0 svc := s3.New(session.New(), cfg) - keys.Range(func(k, value interface{}) bool { - var key string - var ok bool - if key, ok = k.(string); !ok { - log.Fatal("convert key back error") + for { + objnum := atomic.AddInt64(&delete_count, 1) + if objnum > object_count { + atomic.AddInt64(&delete_count, -1) + break + } + + bucket_num := objnum % int64(bucket_count) + + key := fmt.Sprintf("Object-%d", objnum) + fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key) + r := &s3.DeleteObjectInput{ + Bucket: &buckets[bucket_num], + Key: &key, } + req, out := svc.DeleteObjectRequest(r) + err := req.Send() + if err != nil { + errcnt++ + atomic.AddInt64(&delete_slowdown_count, 1) + atomic.AddInt64(&delete_count, -1) + fmt.Println("delete err", err, "out", out.String()) + } + if errcnt > 2 { + break + } fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key) - - r := &s3.DeleteObjectInput{ - Bucket: &buckets[bucket_num], - Key: &key, - } - - req, out := svc.DeleteObjectRequest(r) - err := req.Send() - if err != nil { - errcnt++ - atomic.AddInt64(&delete_slowdown_count, 1) - atomic.AddInt64(&delete_count, -1) - fmt.Println("download err", err, "out", out.String()) - //break - } - if errcnt > 2 { - return false - } - fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key) - return true - }) - + } // Remember last done time delete_finish = time.Now() // One less thread @@ -400,14 +393,12 @@ func main() { delete_count = 0 delete_slowdown_count = 0 - var keys []*sync.Map // Run the upload case running_threads = int64(threads) starttime := time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 0; n < threads; n++ { - keys = append(keys, &sync.Map{}) - go runUpload(n, keys[n]) + go runUpload(n) } // Wait for it to finish @@ -427,7 +418,7 @@ func main() { starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 0; n < threads; n++ { - go runDownload(n, keys[n]) + go runDownload(n) } // Wait for it to finish @@ -447,7 +438,7 @@ func main() { starttime = time.Now() endtime = starttime.Add(time.Second * time.Duration(duration_secs)) for n := 0; n < threads; n++ { - go runDelete(n, keys[n]) + go runDelete(n) } // Wait for it to finish