commit
c709ee84e7
125
s3-benchmark.go
125
s3-benchmark.go
|
@ -96,9 +96,6 @@ func getS3Client() *s3.S3 {
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBucket(bucket_num int, ignore_errors bool) {
|
func createBucket(bucket_num int, ignore_errors bool) {
|
||||||
// Get a client
|
|
||||||
// client := getS3Client()
|
|
||||||
// Create our bucket (may already exist without error)
|
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
log.Printf(buckets[bucket_num])
|
log.Printf(buckets[bucket_num])
|
||||||
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
in := &s3.CreateBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
||||||
|
@ -116,13 +113,7 @@ func createBucket(bucket_num int, ignore_errors bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func deleteAllObjects(bucket_num int) {
|
func deleteAllObjects(bucket_num int) {
|
||||||
// Get a client
|
|
||||||
// client := getS3Client()
|
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
// in := &s3.DeleteBucketInput{Bucket: aws.String(buckets[bucket_num])}
|
|
||||||
// if _, err := svc.DeleteBucket(in); err != nil {
|
|
||||||
// log.Printf("FATAL: Unable to delete bucket %s : %v", buckets[bucket_num], err)
|
|
||||||
// }
|
|
||||||
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("can't list objects")
|
log.Fatal("can't list objects")
|
||||||
|
@ -144,56 +135,6 @@ func deleteAllObjects(bucket_num int) {
|
||||||
log.Fatal("can't list objects")
|
log.Fatal("can't list objects")
|
||||||
}
|
}
|
||||||
fmt.Printf("after delete, got %v objects\n", len(out.Contents))
|
fmt.Printf("after delete, got %v objects\n", len(out.Contents))
|
||||||
|
|
||||||
// // Use multiple routines to do the actual delete
|
|
||||||
// var doneDeletes sync.WaitGroup
|
|
||||||
// // Loop deleting our versions reading as big a list as we can
|
|
||||||
// var keyMarker, versionId *string
|
|
||||||
// var err error
|
|
||||||
// for loop := 1; ; loop++ {
|
|
||||||
// // Delete all the existing objects and versions in the bucket
|
|
||||||
// in := &s3.ListObjectVersionsInput{Bucket: aws.String(bucket), KeyMarker: keyMarker, VersionIdMarker: versionId, MaxKeys: aws.Int64(1000)}
|
|
||||||
// if listVersions, listErr := client.ListObjectVersions(in); listErr == nil {
|
|
||||||
// delete := &s3.Delete{Quiet: aws.Bool(true)}
|
|
||||||
// for _, version := range listVersions.Versions {
|
|
||||||
// delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: version.Key, VersionId: version.VersionId})
|
|
||||||
// }
|
|
||||||
// for _, marker := range listVersions.DeleteMarkers {
|
|
||||||
// delete.Objects = append(delete.Objects, &s3.ObjectIdentifier{Key: marker.Key, VersionId: marker.VersionId})
|
|
||||||
// }
|
|
||||||
// if len(delete.Objects) > 0 {
|
|
||||||
// // Start a delete routine
|
|
||||||
// doDelete := func(bucket string, delete *s3.Delete) {
|
|
||||||
// if _, e := client.DeleteObjects(&s3.DeleteObjectsInput{Bucket: aws.String(bucket), Delete: delete}); e != nil {
|
|
||||||
// err = fmt.Errorf("DeleteObjects unexpected failure: %s", e.Error())
|
|
||||||
// }
|
|
||||||
// doneDeletes.Done()
|
|
||||||
// }
|
|
||||||
// doneDeletes.Add(1)
|
|
||||||
// go doDelete(bucket, delete)
|
|
||||||
// }
|
|
||||||
// // Advance to next versions
|
|
||||||
// if listVersions.IsTruncated == nil || !*listVersions.IsTruncated {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// keyMarker = listVersions.NextKeyMarker
|
|
||||||
// versionId = listVersions.NextVersionIdMarker
|
|
||||||
// } else {
|
|
||||||
// // The bucket may not exist, just ignore in that case
|
|
||||||
// if strings.HasPrefix(listErr.Error(), "NoSuchBucket") {
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// err = fmt.Errorf("ListObjectVersions unexpected failure: %v", listErr)
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// // Wait for deletes to finish
|
|
||||||
// doneDeletes.Wait()
|
|
||||||
|
|
||||||
// If error, it is fatal
|
|
||||||
// if err != nil {
|
|
||||||
// log.Fatalf("FATAL: Unable to delete objects from bucket: %v", err)
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// canonicalAmzHeaders -- return the x-amz headers canonicalized
|
// canonicalAmzHeaders -- return the x-amz headers canonicalized
|
||||||
|
@ -272,26 +213,6 @@ func runUpload(thread_num int, keys *sync.Map) {
|
||||||
}
|
}
|
||||||
keys.Store(key, nil)
|
keys.Store(key, nil)
|
||||||
fmt.Fprintf(os.Stderr, "upload thread %v, %v\r", thread_num, key)
|
fmt.Fprintf(os.Stderr, "upload thread %v, %v\r", thread_num, key)
|
||||||
|
|
||||||
// req, _ := http.NewRequest("PUT", prefix, fileobj)
|
|
||||||
// req.Header.Set("Content-Length", strconv.FormatUint(object_size, 10))
|
|
||||||
// req.Header.Set("Content-MD5", object_data_md5)
|
|
||||||
// setSignature(req)
|
|
||||||
// if resp, err := httpClient.Do(req); err != nil {
|
|
||||||
// log.Fatalf("FATAL: Error uploading object %s: %v", prefix, err)
|
|
||||||
// } else if resp != nil && resp.StatusCode != http.StatusOK {
|
|
||||||
// if resp.StatusCode == http.StatusServiceUnavailable {
|
|
||||||
// atomic.AddInt32(&upload_slowdown_count, 1)
|
|
||||||
// atomic.AddInt32(&upload_count, -1)
|
|
||||||
// } else {
|
|
||||||
// fmt.Printf("Upload status %s: resp: %+v\n", resp.Status, resp)
|
|
||||||
// if resp.Body != nil {
|
|
||||||
// body, _ := ioutil.ReadAll(resp.Body)
|
|
||||||
// fmt.Printf("Body: %s\n", string(body))
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
}
|
}
|
||||||
// Remember last done time
|
// Remember last done time
|
||||||
upload_finish = time.Now()
|
upload_finish = time.Now()
|
||||||
|
@ -305,13 +226,6 @@ func runDownload(thread_num int, keys *sync.Map) {
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
|
|
||||||
keys.Range(func(k, value interface{}) bool {
|
keys.Range(func(k, value interface{}) bool {
|
||||||
|
|
||||||
// objnum := atomic.AddInt32(&delete_count, 1)
|
|
||||||
// if objnum > upload_count {
|
|
||||||
// delete_count = 0
|
|
||||||
// }
|
|
||||||
// key := fmt.Sprintf("Object-%d", objnum)
|
|
||||||
|
|
||||||
if time.Now().After(endtime) {
|
if time.Now().After(endtime) {
|
||||||
// fmt.Println("time ended for download")
|
// fmt.Println("time ended for download")
|
||||||
return false
|
return false
|
||||||
|
@ -345,21 +259,6 @@ func runDownload(thread_num int, keys *sync.Map) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
atomic.AddInt32(&download_count, 1)
|
atomic.AddInt32(&download_count, 1)
|
||||||
|
|
||||||
// prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
|
|
||||||
// req, _ := http.NewRequest("GET", prefix, nil)
|
|
||||||
// setSignature(req)
|
|
||||||
// if resp, err := httpClient.Do(req); err != nil {
|
|
||||||
// log.Fatalf("FATAL: Error downloading object %s: %v", prefix, err)
|
|
||||||
// } else if resp != nil && resp.Body != nil {
|
|
||||||
// if resp.StatusCode == http.StatusServiceUnavailable {
|
|
||||||
// atomic.AddInt32(&download_slowdown_count, 1)
|
|
||||||
// atomic.AddInt32(&download_count, -1)
|
|
||||||
// } else {
|
|
||||||
// io.Copy(ioutil.Discard, resp.Body)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -375,13 +274,6 @@ func runDelete(thread_num int, keys *sync.Map) {
|
||||||
svc := s3.New(session.New(), cfg)
|
svc := s3.New(session.New(), cfg)
|
||||||
|
|
||||||
keys.Range(func(k, value interface{}) bool {
|
keys.Range(func(k, value interface{}) bool {
|
||||||
|
|
||||||
// objnum := atomic.AddInt32(&delete_count, 1)
|
|
||||||
// if objnum > upload_count {
|
|
||||||
// break
|
|
||||||
// }
|
|
||||||
// key := fmt.Sprintf("Object-%d", objnum)
|
|
||||||
|
|
||||||
var key string
|
var key string
|
||||||
var ok bool
|
var ok bool
|
||||||
if key, ok = k.(string); !ok {
|
if key, ok = k.(string); !ok {
|
||||||
|
@ -408,17 +300,6 @@ func runDelete(thread_num int, keys *sync.Map) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
|
fmt.Fprintf(os.Stderr, "delete thread %v, %v\r", thread_num, key)
|
||||||
|
|
||||||
// prefix := fmt.Sprintf("%s/%s/Object-%d", url_host, bucket, objnum)
|
|
||||||
// req, _ := http.NewRequest("DELETE", prefix, nil)
|
|
||||||
// setSignature(req)
|
|
||||||
// if resp, err := httpClient.Do(req); err != nil {
|
|
||||||
// log.Fatalf("FATAL: Error deleting object %s: %v", prefix, err)
|
|
||||||
// } else if resp != nil && resp.StatusCode == http.StatusServiceUnavailable {
|
|
||||||
// atomic.AddInt32(&delete_slowdown_count, 1)
|
|
||||||
// atomic.AddInt32(&delete_count, -1)
|
|
||||||
// }
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -529,12 +410,6 @@ func main() {
|
||||||
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
|
loop, upload_time, upload_count, bytefmt.ByteSize(uint64(bps)), float64(upload_count)/upload_time, upload_slowdown_count))
|
||||||
|
|
||||||
uploadspeed = bps / bytefmt.MEGABYTE
|
uploadspeed = bps / bytefmt.MEGABYTE
|
||||||
// count := 0
|
|
||||||
// keys.Range(func(k, value interface{}) bool {
|
|
||||||
// count++
|
|
||||||
// return true
|
|
||||||
// })
|
|
||||||
// fmt.Println("map got ", count)
|
|
||||||
|
|
||||||
// Run the download case
|
// Run the download case
|
||||||
running_threads = int32(threads)
|
running_threads = int32(threads)
|
||||||
|
|
Loading…
Reference in New Issue