commit
dd4d6e7ca3
471
s3-benchmark.go
471
s3-benchmark.go
|
@ -15,15 +15,17 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"code.cloudfoundry.org/bytefmt"
|
"code.cloudfoundry.org/bytefmt"
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
|
@ -37,9 +39,9 @@ var buckets []string
|
||||||
var duration_secs, threads, loops, bucket_count int
|
var duration_secs, threads, loops, bucket_count int
|
||||||
var object_data []byte
|
var object_data []byte
|
||||||
var object_data_md5 string
|
var object_data_md5 string
|
||||||
var object_size uint64
|
var running_threads, object_count, object_size, op_count int64
|
||||||
var running_threads, object_count, upload_count, download_count, delete_count, upload_slowdown_count, download_slowdown_count, delete_slowdown_count int64
|
|
||||||
var endtime, upload_finish, download_finish, delete_finish time.Time
|
var endtime, upload_finish, download_finish, delete_finish time.Time
|
||||||
|
var interval float64
|
||||||
|
|
||||||
func logit(msg string) {
|
func logit(msg string) {
|
||||||
fmt.Println(msg)
|
fmt.Println(msg)
|
||||||
|
@ -96,7 +98,6 @@ func getS3Client() *s3.S3 {
|
||||||
|
|
||||||
func createBucket(bucket_num int, ignore_errors bool) {
|
func createBucket(bucket_num int, ignore_errors bool) {
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
log.Printf(buckets[bucket_num])
|
|
||||||
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
||||||
if _, err := svc.CreateBucket(in); err != nil {
|
if _, err := svc.CreateBucket(in); err != nil {
|
||||||
if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) ||
|
if strings.Contains(err.Error(), s3.ErrCodeBucketAlreadyOwnedByYou) ||
|
||||||
|
@ -180,18 +181,248 @@ func setSignature(req *http.Request) {
|
||||||
req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature))
|
req.Header.Set("Authorization", fmt.Sprintf("AWS %s:%s", access_key, signature))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runUpload(thread_num int) {
|
type IntervalStats struct {
|
||||||
// bucket_num := thread_num % bucket_count
|
bytes int64
|
||||||
|
slowdowns int64
|
||||||
|
latNano []int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type ThreadStats struct {
|
||||||
|
start int64
|
||||||
|
curInterval int64
|
||||||
|
intervals []IntervalStats
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeThreadStats(s int64, intervalNano int64) ThreadStats {
|
||||||
|
ts := ThreadStats{start: s, curInterval: -1}
|
||||||
|
ts.updateIntervals(intervalNano)
|
||||||
|
return ts
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *ThreadStats) updateIntervals(intervalNano int64) int64 {
|
||||||
|
if intervalNano < 0 {
|
||||||
|
return ts.curInterval
|
||||||
|
}
|
||||||
|
for ts.start + intervalNano*ts.curInterval < time.Now().UnixNano() {
|
||||||
|
ts.intervals = append(ts.intervals, IntervalStats{0, 0, []int64{}})
|
||||||
|
ts.curInterval++
|
||||||
|
}
|
||||||
|
return ts.curInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *ThreadStats) finish() {
|
||||||
|
ts.curInterval = -1
|
||||||
|
}
|
||||||
|
|
||||||
|
type Stats struct {
|
||||||
|
// threads
|
||||||
|
threads int
|
||||||
|
// The loop we are in
|
||||||
|
loop int
|
||||||
|
// Test mode being run
|
||||||
|
mode string
|
||||||
|
// start time in nanoseconds
|
||||||
|
startNano int64
|
||||||
|
// end time in nanoseconds
|
||||||
|
endNano int64
|
||||||
|
// Duration in nanoseconds for each interval
|
||||||
|
intervalNano int64
|
||||||
|
// Per-thread statistics
|
||||||
|
threadStats []ThreadStats
|
||||||
|
// a map of counters of how many threads have finished given interval of stats
|
||||||
|
intervalCompletions sync.Map
|
||||||
|
// a counter of how many threads have finished updating stats entirely
|
||||||
|
completions int32
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeStats(loop int, mode string, threads int, intervalNano int64) Stats {
|
||||||
|
start := time.Now().UnixNano()
|
||||||
|
s := Stats{threads, loop, mode, start, 0, intervalNano, []ThreadStats{}, sync.Map{}, 0}
|
||||||
|
for i := 0; i < threads; i++ {
|
||||||
|
s.threadStats = append(s.threadStats, makeThreadStats(start, s.intervalNano))
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) _getIntervalStats(i int64) IntervalStats {
|
||||||
|
bytes := int64(0)
|
||||||
|
ops := int64(0)
|
||||||
|
slowdowns := int64(0);
|
||||||
|
|
||||||
|
for t := 0; t < stats.threads; t++ {
|
||||||
|
bytes += stats.threadStats[t].intervals[i].bytes
|
||||||
|
ops += int64(len(stats.threadStats[t].intervals[i].latNano))
|
||||||
|
slowdowns += stats.threadStats[t].intervals[i].slowdowns
|
||||||
|
}
|
||||||
|
// Aggregate the per-thread Latency slice
|
||||||
|
tmpLat := make([]int64, ops)
|
||||||
|
var c int
|
||||||
|
for t := 0; t < stats.threads; t++ {
|
||||||
|
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
|
||||||
|
}
|
||||||
|
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
|
||||||
|
return IntervalStats{bytes, slowdowns, tmpLat}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) _getTotalStats() IntervalStats {
|
||||||
|
bytes := int64(0)
|
||||||
|
ops := int64(0)
|
||||||
|
slowdowns := int64(0);
|
||||||
|
|
||||||
|
for t := 0; t < stats.threads; t++ {
|
||||||
|
for i := 0; i < len(stats.threadStats[t].intervals); i++ {
|
||||||
|
bytes += stats.threadStats[t].intervals[i].bytes
|
||||||
|
ops += int64(len(stats.threadStats[t].intervals[i].latNano))
|
||||||
|
slowdowns += stats.threadStats[t].intervals[i].slowdowns
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Aggregate the per-thread Latency slice
|
||||||
|
tmpLat := make([]int64, ops)
|
||||||
|
var c int
|
||||||
|
for t := 0; t < stats.threads; t++ {
|
||||||
|
for i := 0; i < len(stats.threadStats[t].intervals); i++ {
|
||||||
|
c += copy(tmpLat[c:], stats.threadStats[t].intervals[i].latNano)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sort.Slice(tmpLat, func(i, j int) bool { return tmpLat[i] < tmpLat[j] })
|
||||||
|
return IntervalStats{bytes, slowdowns, tmpLat}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) logI(i int64) bool {
|
||||||
|
// Check bounds first
|
||||||
|
if stats.intervalNano < 0 || i < 0 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Not safe to log if not all writers have completed.
|
||||||
|
value, ok := stats.intervalCompletions.Load(i)
|
||||||
|
if !ok {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
cp, ok := value.(*int32)
|
||||||
|
if !ok {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
count := atomic.LoadInt32(cp)
|
||||||
|
if count < int32(stats.threads) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats._log(strconv.FormatInt(i, 10), stats.intervalNano, stats._getIntervalStats(i))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) log() bool {
|
||||||
|
// Not safe to log if not all writers have completed.
|
||||||
|
completions := atomic.LoadInt32(&stats.completions)
|
||||||
|
if (completions < int32(stats.threads)) {
|
||||||
|
log.Printf("log, completions: %d", completions)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return stats._log("ALL", stats.endNano - stats.startNano, stats._getTotalStats())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) _log(intervalName string, intervalNano int64, intervalStats IntervalStats) bool {
|
||||||
|
// Compute and log the stats
|
||||||
|
ops := len(intervalStats.latNano)
|
||||||
|
totalLat := int64(0);
|
||||||
|
minLat := float64(0);
|
||||||
|
maxLat := float64(0);
|
||||||
|
NinetyNineLat := float64(0);
|
||||||
|
avgLat := float64(0);
|
||||||
|
if ops > 0 {
|
||||||
|
minLat = float64(intervalStats.latNano[0]) / 1000000
|
||||||
|
maxLat = float64(intervalStats.latNano[ops - 1]) / 1000000
|
||||||
|
for i := range intervalStats.latNano {
|
||||||
|
totalLat += intervalStats.latNano[i]
|
||||||
|
}
|
||||||
|
avgLat = float64(totalLat) / float64(ops) / 1000000
|
||||||
|
NintyNineLatNano := intervalStats.latNano[int64(math.Round(0.99*float64(ops))) - 1]
|
||||||
|
NinetyNineLat = float64(NintyNineLatNano) / 1000000
|
||||||
|
}
|
||||||
|
seconds := float64(intervalNano) / 1000000000
|
||||||
|
mbps := float64(intervalStats.bytes) / seconds / bytefmt.MEGABYTE
|
||||||
|
iops := float64(ops) / seconds
|
||||||
|
|
||||||
|
log.Printf(
|
||||||
|
"Loop: %d, Int: %s, Dur(s): %.1f, Mode: %s, Ops: %d, MB/s: %.2f, IO/s: %.0f, Lat(ms): [ min: %.1f, avg: %.1f, 99%%: %.1f, max: %.1f ], Slowdowns: %d",
|
||||||
|
stats.loop,
|
||||||
|
intervalName,
|
||||||
|
seconds,
|
||||||
|
stats.mode,
|
||||||
|
ops,
|
||||||
|
mbps,
|
||||||
|
iops,
|
||||||
|
minLat,
|
||||||
|
avgLat,
|
||||||
|
NinetyNineLat,
|
||||||
|
maxLat,
|
||||||
|
intervalStats.slowdowns)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only safe to call from the calling thread
|
||||||
|
func (stats *Stats) updateIntervals(thread_num int) int64 {
|
||||||
|
curInterval := stats.threadStats[thread_num].curInterval
|
||||||
|
newInterval := stats.threadStats[thread_num].updateIntervals(stats.intervalNano)
|
||||||
|
|
||||||
|
// Finish has already been called
|
||||||
|
if curInterval < 0 {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := curInterval; i < newInterval; i++ {
|
||||||
|
// load or store the current value
|
||||||
|
value, _ := stats.intervalCompletions.LoadOrStore(i, new(int32))
|
||||||
|
cp, ok := value.(*int32)
|
||||||
|
if !ok {
|
||||||
|
log.Printf("updateIntervals: got data of type %T but wanted *int32", value)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
count := atomic.AddInt32(cp, 1)
|
||||||
|
if count == int32(stats.threads) {
|
||||||
|
stats.logI(i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) addOp(thread_num int, bytes int64, latNano int64) {
|
||||||
|
|
||||||
|
// Interval statistics
|
||||||
|
cur := stats.threadStats[thread_num].curInterval
|
||||||
|
if cur < 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stats.threadStats[thread_num].intervals[cur].bytes += bytes
|
||||||
|
stats.threadStats[thread_num].intervals[cur].latNano =
|
||||||
|
append(stats.threadStats[thread_num].intervals[cur].latNano, latNano)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) addSlowDown(thread_num int) {
|
||||||
|
cur := stats.threadStats[thread_num].curInterval
|
||||||
|
stats.threadStats[thread_num].intervals[cur].slowdowns++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (stats *Stats) finish(thread_num int) {
|
||||||
|
stats.threadStats[thread_num].updateIntervals(stats.intervalNano)
|
||||||
|
stats.threadStats[thread_num].finish()
|
||||||
|
count := atomic.AddInt32(&stats.completions, 1)
|
||||||
|
if count == int32(stats.threads) {
|
||||||
|
stats.endNano = time.Now().UnixNano()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runUpload(thread_num int, fendtime time.Time, stats *Stats) {
|
||||||
errcnt := 0
|
errcnt := 0
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
for {
|
for {
|
||||||
if duration_secs > -1 && time.Now().After(endtime) {
|
if duration_secs > -1 && time.Now().After(endtime) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
objnum := atomic.AddInt64(&upload_count, 1)
|
objnum := atomic.AddInt64(&op_count, 1)
|
||||||
bucket_num := objnum % int64(bucket_count)
|
bucket_num := objnum % int64(bucket_count)
|
||||||
if object_count > -1 && objnum > object_count {
|
if object_count > -1 && objnum > object_count {
|
||||||
objnum = atomic.AddInt64(&upload_count, -1)
|
objnum = atomic.AddInt64(&op_count, -1)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fileobj := bytes.NewReader(object_data)
|
fileobj := bytes.NewReader(object_data)
|
||||||
|
@ -202,30 +433,36 @@ func runUpload(thread_num int) {
|
||||||
Key: &key,
|
Key: &key,
|
||||||
Body: fileobj,
|
Body: fileobj,
|
||||||
}
|
}
|
||||||
|
start := time.Now().UnixNano()
|
||||||
req, _ := svc.PutObjectRequest(r)
|
req, _ := svc.PutObjectRequest(r)
|
||||||
// Disable payload checksum calculation (very expensive)
|
// Disable payload checksum calculation (very expensive)
|
||||||
req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
|
req.HTTPRequest.Header.Add("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
|
||||||
err := req.Send()
|
err := req.Send()
|
||||||
|
end := time.Now().UnixNano()
|
||||||
|
stats.updateIntervals(thread_num)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errcnt++
|
errcnt++
|
||||||
atomic.AddInt64(&upload_slowdown_count, 1)
|
stats.addSlowDown(thread_num);
|
||||||
atomic.AddInt64(&upload_count, -1)
|
atomic.AddInt64(&op_count, -1)
|
||||||
fmt.Println("upload err", err)
|
fmt.Println("upload err", err)
|
||||||
//break
|
} else {
|
||||||
|
// Update the stats
|
||||||
|
stats.addOp(thread_num, object_size, end-start)
|
||||||
}
|
}
|
||||||
if errcnt > 2 {
|
if errcnt > 2 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Fprintf(os.Stderr, "upload thread %5v, %v\r", thread_num, key)
|
|
||||||
}
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
upload_finish = time.Now()
|
upload_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
atomic.AddInt64(&running_threads, -1)
|
atomic.AddInt64(&running_threads, -1)
|
||||||
|
// stats are done
|
||||||
|
stats.finish(thread_num)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runDownload(thread_num int) {
|
func runDownload(thread_num int, fendtime time.Time, stats *Stats) {
|
||||||
errcnt := 0
|
errcnt := 0
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
for {
|
for {
|
||||||
|
@ -233,9 +470,9 @@ func runDownload(thread_num int) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
objnum := atomic.AddInt64(&download_count, 1)
|
objnum := atomic.AddInt64(&op_count, 1)
|
||||||
if objnum > object_count {
|
if objnum > object_count {
|
||||||
atomic.AddInt64(&download_count, -1)
|
atomic.AddInt64(&op_count, -1)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -246,37 +483,45 @@ func runDownload(thread_num int) {
|
||||||
Key: &key,
|
Key: &key,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now().UnixNano()
|
||||||
req, resp := svc.GetObjectRequest(r)
|
req, resp := svc.GetObjectRequest(r)
|
||||||
err := req.Send()
|
err := req.Send()
|
||||||
|
end := time.Now().UnixNano()
|
||||||
|
stats.updateIntervals(thread_num)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errcnt++
|
errcnt++
|
||||||
atomic.AddInt64(&download_slowdown_count, 1)
|
stats.addSlowDown(thread_num);
|
||||||
atomic.AddInt64(&download_count, -1)
|
|
||||||
fmt.Println("download err", err)
|
fmt.Println("download err", err)
|
||||||
//break
|
} else {
|
||||||
}
|
// Update the stats
|
||||||
|
stats.addOp(thread_num, object_size, end-start)
|
||||||
|
}
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
_, err = io.Copy(ioutil.Discard, resp.Body)
|
_, err = io.Copy(ioutil.Discard, resp.Body)
|
||||||
}
|
}
|
||||||
if errcnt > 2 {
|
if errcnt > 2 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Fprintf(os.Stderr, "download thread %5v, %v\r", thread_num, key)
|
|
||||||
}
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
download_finish = time.Now()
|
download_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
atomic.AddInt64(&running_threads, -1)
|
atomic.AddInt64(&running_threads, -1)
|
||||||
|
// stats are done
|
||||||
|
stats.finish(thread_num)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runDelete(thread_num int) {
|
func runDelete(thread_num int, stats *Stats) {
|
||||||
errcnt := 0
|
errcnt := 0
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
objnum := atomic.AddInt64(&delete_count, 1)
|
objnum := atomic.AddInt64(&op_count, 1)
|
||||||
if objnum > object_count {
|
if objnum > object_count {
|
||||||
atomic.AddInt64(&delete_count, -1)
|
atomic.AddInt64(&op_count, -1)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,119 +533,84 @@ func runDelete(thread_num int) {
|
||||||
Key: &key,
|
Key: &key,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start := time.Now().UnixNano()
|
||||||
req, out := svc.DeleteObjectRequest(r)
|
req, out := svc.DeleteObjectRequest(r)
|
||||||
err := req.Send()
|
err := req.Send()
|
||||||
|
end := time.Now().UnixNano()
|
||||||
|
stats.updateIntervals(thread_num)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errcnt++
|
errcnt++
|
||||||
atomic.AddInt64(&delete_slowdown_count, 1)
|
stats.addSlowDown(thread_num);
|
||||||
atomic.AddInt64(&delete_count, -1)
|
|
||||||
fmt.Println("delete err", err, "out", out.String())
|
fmt.Println("delete err", err, "out", out.String())
|
||||||
}
|
} else {
|
||||||
|
// Update the stats
|
||||||
|
stats.addOp(thread_num, object_size, end-start)
|
||||||
|
}
|
||||||
if errcnt > 2 {
|
if errcnt > 2 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Fprintf(os.Stderr, "delete thread %5v, %v\r", thread_num, key)
|
|
||||||
}
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
delete_finish = time.Now()
|
delete_finish = time.Now()
|
||||||
// One less thread
|
// One less thread
|
||||||
atomic.AddInt64(&running_threads, -1)
|
atomic.AddInt64(&running_threads, -1)
|
||||||
|
// stats are done
|
||||||
|
stats.finish(thread_num)
|
||||||
}
|
}
|
||||||
|
|
||||||
var cfg *aws.Config
|
var cfg *aws.Config
|
||||||
|
|
||||||
func init_buckets(loop int) {
|
func initBuckets(loop int, stats *Stats) {
|
||||||
// Initialize data for the bucket
|
|
||||||
object_data = make([]byte, object_size)
|
|
||||||
rand.Read(object_data)
|
|
||||||
hasher := md5.New()
|
|
||||||
hasher.Write(object_data)
|
|
||||||
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
|
||||||
|
|
||||||
// Create the buckets and delete all the objects
|
// Create the buckets and delete all the objects
|
||||||
starttime := time.Now()
|
|
||||||
for i := 0; i < bucket_count; i++ {
|
for i := 0; i < bucket_count; i++ {
|
||||||
|
start := time.Now().UnixNano()
|
||||||
createBucket(i, true)
|
createBucket(i, true)
|
||||||
deleteAllObjects(i)
|
deleteAllObjects(i)
|
||||||
|
end := time.Now().UnixNano()
|
||||||
|
stats.updateIntervals(0)
|
||||||
|
stats.addOp(0, 0, end-start)
|
||||||
}
|
}
|
||||||
init_time := time.Now().Sub(starttime).Seconds()
|
stats.finish(0)
|
||||||
|
|
||||||
logit(fmt.Sprintf("Loop %d: INIT time %.1f secs, buckets = %d, speed = %.1f buckets/sec.",
|
|
||||||
loop, init_time, bucket_count, float64(bucket_count)/init_time))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func run_put(loop int) float64 {
|
func runWrapper(loop int, r rune) {
|
||||||
// reset counters
|
op_count = 0
|
||||||
upload_count = 0
|
running_threads = int64(threads)
|
||||||
upload_slowdown_count = 0
|
intervalNano := int64(interval*1000000000)
|
||||||
|
endtime = time.Now().Add(time.Second * time.Duration(duration_secs))
|
||||||
|
var stats Stats
|
||||||
|
|
||||||
running_threads = int64(threads)
|
switch r {
|
||||||
starttime := time.Now()
|
case 'i':
|
||||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
log.Printf("Running Loop %d Init", loop)
|
||||||
for n := 0; n < threads; n++ {
|
stats = makeStats(loop, "INIT", 1, intervalNano)
|
||||||
go runUpload(n)
|
initBuckets(loop, &stats);
|
||||||
|
running_threads = 0;
|
||||||
|
case 'p':
|
||||||
|
log.Printf("Running Loop %d Put Test", loop)
|
||||||
|
stats = makeStats(loop, "PUT", threads, intervalNano)
|
||||||
|
for n := 0; n < threads; n++ {
|
||||||
|
go runUpload(n, endtime, &stats);
|
||||||
|
}
|
||||||
|
case 'g':
|
||||||
|
log.Printf("Running Loop %d Get Test", loop)
|
||||||
|
stats = makeStats(loop, "GET", threads, intervalNano)
|
||||||
|
for n := 0; n < threads; n++ {
|
||||||
|
go runDownload(n, endtime, &stats);
|
||||||
|
}
|
||||||
|
case 'd':
|
||||||
|
log.Printf("Running Loop %d Del Test", loop)
|
||||||
|
stats = makeStats(loop, "DEL", threads, intervalNano)
|
||||||
|
for n := 0; n < threads; n++ {
|
||||||
|
go runDelete(n, &stats);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// Wait for it to finish
|
||||||
// Wait for it to finish
|
for atomic.LoadInt64(&running_threads) > 0 {
|
||||||
for atomic.LoadInt64(&running_threads) > 0 {
|
time.Sleep(time.Millisecond)
|
||||||
time.Sleep(time.Millisecond)
|
}
|
||||||
}
|
stats.log()
|
||||||
upload_time := upload_finish.Sub(starttime).Seconds()
|
|
||||||
|
|
||||||
bps := float64(uint64(upload_count)*object_size) / upload_time
|
|
||||||
logit(fmt.Sprintf("Loop %d: PUT time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
|
|
||||||
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
|
|
||||||
|
|
||||||
return bps / bytefmt.MEGABYTE
|
|
||||||
}
|
|
||||||
|
|
||||||
func run_get(loop int) float64 {
|
|
||||||
// reset counters
|
|
||||||
download_count = 0
|
|
||||||
download_slowdown_count = 0
|
|
||||||
|
|
||||||
running_threads = int64(threads)
|
|
||||||
starttime := time.Now()
|
|
||||||
endtime = starttime.Add(time.Second * time.Duration(duration_secs))
|
|
||||||
for n := 0; n < threads; n++ {
|
|
||||||
go runDownload(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for it to finish
|
|
||||||
for atomic.LoadInt64(&running_threads) > 0 {
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
}
|
|
||||||
download_time := download_finish.Sub(starttime).Seconds()
|
|
||||||
|
|
||||||
bps := float64(uint64(download_count)*object_size) / download_time
|
|
||||||
logit(fmt.Sprintf("Loop %d: GET time %.1f secs, objects = %d, speed = %sB/sec, %.1f operations/sec. Slowdowns = %d",
|
|
||||||
loop, download_time, download_count, bytefmt.ByteSize(uint64(bps)), float64(download_count)/download_time, download_slowdown_count))
|
|
||||||
|
|
||||||
return bps / bytefmt.MEGABYTE
|
|
||||||
}
|
|
||||||
|
|
||||||
func run_delete(loop int) float64 {
|
|
||||||
// reset counters
|
|
||||||
delete_count = 0
|
|
||||||
delete_slowdown_count = 0
|
|
||||||
|
|
||||||
running_threads = int64(threads)
|
|
||||||
starttime := time.Now()
|
|
||||||
for n := 0; n < threads; n++ {
|
|
||||||
go runDelete(n)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for it to finish
|
|
||||||
for atomic.LoadInt64(&running_threads) > 0 {
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
}
|
|
||||||
delete_time := delete_finish.Sub(starttime).Seconds()
|
|
||||||
|
|
||||||
bps := float64(uint64(delete_count)*object_size) / delete_time
|
|
||||||
logit(fmt.Sprintf("Loop %d: DELETE time %.1f secs, %.1f deletes/sec. Slowdowns = %d",
|
|
||||||
loop, delete_time, float64(upload_count)/delete_time, delete_slowdown_count))
|
|
||||||
|
|
||||||
return bps / bytefmt.MEGABYTE
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -419,7 +629,7 @@ func init() {
|
||||||
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
myflag.IntVar(&threads, "t", 1, "Number of threads to run")
|
||||||
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
myflag.IntVar(&loops, "l", 1, "Number of times to repeat test")
|
||||||
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
|
myflag.StringVar(&sizeArg, "z", "1M", "Size of objects in bytes with postfix K, M, and G")
|
||||||
|
myflag.Float64Var(&interval, "ri", 1.0, "Number of seconds between report intervals")
|
||||||
// define custom usage output with notes
|
// define custom usage output with notes
|
||||||
notes :=
|
notes :=
|
||||||
`
|
`
|
||||||
|
@ -471,9 +681,20 @@ NOTES:
|
||||||
log.Fatal("Invalid modes passed to -m, see help for details.")
|
log.Fatal("Invalid modes passed to -m, see help for details.")
|
||||||
}
|
}
|
||||||
var err error
|
var err error
|
||||||
if object_size, err = bytefmt.ToBytes(sizeArg); err != nil {
|
var size uint64
|
||||||
|
if size, err = bytefmt.ToBytes(sizeArg); err != nil {
|
||||||
log.Fatalf("Invalid -z argument for object size: %v", err)
|
log.Fatalf("Invalid -z argument for object size: %v", err)
|
||||||
}
|
}
|
||||||
|
object_size = int64(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
func initData() {
|
||||||
|
// Initialize data for the bucket
|
||||||
|
object_data = make([]byte, object_size)
|
||||||
|
rand.Read(object_data)
|
||||||
|
hasher := md5.New()
|
||||||
|
hasher.Write(object_data)
|
||||||
|
object_data_md5 = base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
@ -494,30 +715,18 @@ func main() {
|
||||||
logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
|
logit(fmt.Sprintf("Parameters: url=%s, bucket_prefix=%s, bucket_count=%d, region=%s, duration=%d, threads=%d, loops=%d, size=%s",
|
||||||
url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg))
|
url_host, bucket_prefix, bucket_count, region, duration_secs, threads, loops, sizeArg))
|
||||||
|
|
||||||
|
// Init Data
|
||||||
|
initData()
|
||||||
|
|
||||||
// Setup the slice of buckets
|
// Setup the slice of buckets
|
||||||
for i := 0; i < bucket_count; i++ {
|
for i := 0; i < bucket_count; i++ {
|
||||||
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
|
buckets = append(buckets, fmt.Sprintf("%s%012d", bucket_prefix, i))
|
||||||
}
|
}
|
||||||
|
|
||||||
var uploadspeed, downloadspeed float64
|
|
||||||
// Loop running the tests
|
// Loop running the tests
|
||||||
for loop := 1; loop <= loops; loop++ {
|
for loop := 0; loop < loops; loop++ {
|
||||||
for _, r := range modes {
|
for _, r := range modes {
|
||||||
switch r {
|
runWrapper(loop, r)
|
||||||
case 'i':
|
|
||||||
init_buckets(loop);
|
|
||||||
case 'p':
|
|
||||||
uploadspeed = run_put(loop);
|
|
||||||
case 'g':
|
|
||||||
downloadspeed = run_get(loop);
|
|
||||||
case 'd':
|
|
||||||
run_delete(loop);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// All done
|
|
||||||
name := strings.Split(strings.TrimPrefix(url_host, "http://"), ".")[0]
|
|
||||||
fmt.Printf("result title: name-concurrency-size, uloadspeed, downloadspeed\n")
|
|
||||||
fmt.Printf("result csv: %v-%v-%v,%.2f,%.2f\n", name, threads, sizeArg, uploadspeed, downloadspeed)
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue