From 2824649c37e9204560d120e4558bfc2af4ddaa2d Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Wed, 12 Aug 2020 14:35:58 +0300 Subject: [PATCH] Implement parallel deletes --- hsbench.go | 74 +++++++++++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/hsbench.go b/hsbench.go index 202fbca..1177ec6 100644 --- a/hsbench.go +++ b/hsbench.go @@ -726,38 +726,48 @@ func runBucketsInit(thread_num int, stats *Stats) { atomic.AddInt64(&running_threads, -1) } -func runBucketsClear(thread_num int, stats *Stats) { +type pagedObject struct { + bucket_num int64 + key string + size int64 +} + +func runPagedList(wg *sync.WaitGroup, bucket_num int64, list chan<- pagedObject) { + svc := s3.New(session.New(), cfg) + svc.ListObjectsPages( + &s3.ListObjectsInput{ + Bucket: &buckets[bucket_num], + MaxKeys: &max_keys, + }, + func(page *s3.ListObjectsOutput, last bool) bool { + for _, v := range page.Contents { + list <- pagedObject{ + bucket_num: bucket_num, + key: *v.Key, + size: *v.Size, + } + } + return true + }) + wg.Done() +} + +func runBucketsClear(list <-chan pagedObject, thread_num int, stats *Stats) { svc := s3.New(session.New(), cfg) for { - bucket_num := atomic.AddInt64(&op_counter, 1) - if bucket_num >= bucket_count { - atomic.AddInt64(&op_counter, -1) - break - } - out, err := svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) + v := <-list + start := time.Now().UnixNano() + _, err := svc.DeleteObject(&s3.DeleteObjectInput{ + Bucket: &buckets[v.bucket_num], + Key: &v.key, + }) + end := time.Now().UnixNano() + stats.updateIntervals(thread_num) if err != nil { break } - n := len(out.Contents) - for n > 0 { - for _, v := range out.Contents { - start := time.Now().UnixNano() - svc.DeleteObject(&s3.DeleteObjectInput{ - Bucket: &buckets[bucket_num], - Key: v.Key, - }) - end := time.Now().UnixNano() - stats.updateIntervals(thread_num) - stats.addOp(thread_num, *v.Size, end-start) - - } - out, err = svc.ListObjects(&s3.ListObjectsInput{Bucket: &buckets[bucket_num]}) - if err != nil { - break - } - n = len(out.Contents) - } + stats.addOp(thread_num, v.size, end-start) } stats.finish(thread_num) atomic.AddInt64(&running_threads, -1) @@ -781,9 +791,17 @@ func runWrapper(loop int, r rune) []OutputStats { case 'c': log.Printf("Running Loop %d BUCKET CLEAR TEST", loop) stats = makeStats(loop, "BCLR", threads, intervalNano) - for n := 0; n < threads; n++ { - go runBucketsClear(n, &stats) + list := make(chan pagedObject, threads*2) + var wg = sync.WaitGroup{} + for b := int64(0); b < bucket_count; b++ { + wg.Add(1) + go runPagedList(&wg, b, list) } + for n := 0; n < threads; n++ { + go runBucketsClear(list, n, &stats) + } + wg.Wait() + close(list) case 'x': log.Printf("Running Loop %d BUCKET DELETE TEST", loop) stats = makeStats(loop, "BDEL", threads, intervalNano)