Make single goroutine encodes more efficient (#122)

Calculate the optimal per round size to keep data in cache when not using WithAutoGoroutines.

```
λ benchcmp before.txt after.txt
benchmark                          old ns/op     new ns/op     delta
BenchmarkParallel_8x8x05M-16       675225        321053        -52.45%
BenchmarkParallel_20x10x05M-16     3471988       600740        -82.70%
BenchmarkParallel_8x8x1M-16        3948606       728093        -81.56%
BenchmarkParallel_8x8x8M-16        47361588      5976467       -87.38%
BenchmarkParallel_8x8x32M-16       195044200     24365474      -87.51%

benchmark                          old MB/s     new MB/s     speedup
BenchmarkParallel_8x8x05M-16       6211.71      13064.22     2.10x
BenchmarkParallel_20x10x05M-16     3020.10      17454.73     5.78x
BenchmarkParallel_8x8x1M-16        2124.45      11521.34     5.42x
BenchmarkParallel_8x8x8M-16        1416.95      11228.85     7.92x
BenchmarkParallel_8x8x32M-16       1376.28      11017.04     8.00x

```
master
Klaus Post 2020-05-03 19:37:22 +02:00 committed by GitHub
parent 0b98f5350a
commit 65df535980
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 109 additions and 67 deletions

View File

@ -311,42 +311,3 @@ func TestCodeSomeShardsAvx512_ManyxMany(t *testing.T) {
}
}
}
func benchmarkParallel(b *testing.B, dataShards, parityShards, shardSize int) {
if !(defaultOptions.useAVX512 || defaultOptions.useAVX2) {
return
}
r, err := New(dataShards, parityShards, testOptions(WithAutoGoroutines(shardSize))...)
if err != nil {
b.Fatal(err)
}
shards := make([][]byte, dataShards+parityShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
rand.Seed(0)
for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
b.SetBytes(int64(shardSize * dataShards))
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
err = r.Encode(shards)
if err != nil {
b.Fatal(err)
}
}
})
}
func BenchmarkParallel_8x8x05M(b *testing.B) { benchmarkParallel(b, 8, 8, 1*1024*1024/2) }
func BenchmarkParallel_8x8x1M(b *testing.B) { benchmarkParallel(b, 8, 8, 1*1024*1024) }
func BenchmarkParallel_8x8x8M(b *testing.B) { benchmarkParallel(b, 8, 8, 8*1024*1024) }
func BenchmarkParallel_8x8x32M(b *testing.B) { benchmarkParallel(b, 8, 8, 32*1024*1024) }

View File

@ -16,6 +16,7 @@ type options struct {
usePAR1Matrix bool
useCauchy bool
shardSize int
perRound int
}
var defaultOptions = options{

View File

@ -242,23 +242,31 @@ func New(dataShards, parityShards int, opts ...Option) (Encoder, error) {
if err != nil {
return nil, err
}
// Calculate what we want per round
r.o.perRound = cpuid.CPU.Cache.L2
if r.o.perRound <= 0 {
// Set to 128K if undetectable.
r.o.perRound = 128 << 10
}
if cpuid.CPU.ThreadsPerCore > 1 && r.o.maxGoroutines > cpuid.CPU.PhysicalCores {
// If multiple threads per core, make sure they don't contend for cache.
r.o.perRound /= cpuid.CPU.ThreadsPerCore
}
// 1 input + parity must fit in cache, and we add one more to be safer.
r.o.perRound = r.o.perRound / (1 + parityShards)
// Align to 64 bytes.
r.o.perRound = ((r.o.perRound + 63) / 64) * 64
if r.o.perRound < r.o.minSplitSize {
r.o.perRound = r.o.minSplitSize
}
if r.o.shardSize > 0 {
cacheSize := cpuid.CPU.Cache.L2
if cacheSize <= 0 {
// Set to 128K if undetectable.
cacheSize = 128 << 10
}
p := runtime.NumCPU()
g := r.o.shardSize / r.o.perRound
// 1 input + parity must fit in cache, and we add one more to be safer.
shards := 1 + parityShards
g := (r.o.shardSize * shards) / (cacheSize - (cacheSize >> 4))
if cpuid.CPU.ThreadsPerCore > 1 {
// If multiple threads per core, make sure they don't contend for cache.
g *= cpuid.CPU.ThreadsPerCore
}
g *= 2
if g < p {
g = p
}
@ -457,15 +465,28 @@ func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, output
r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount)
return
}
for c := 0; c < r.DataShards; c++ {
in := inputs[c]
for iRow := 0; iRow < outputCount; iRow++ {
if c == 0 {
galMulSlice(matrixRows[iRow][c], in, outputs[iRow], &r.o)
} else {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], &r.o)
// Process using no goroutines
start, end := 0, r.o.perRound
if end > len(inputs[0]) {
end = len(inputs[0])
}
for start < len(inputs[0]) {
for c := 0; c < r.DataShards; c++ {
in := inputs[c][start:end]
for iRow := 0; iRow < outputCount; iRow++ {
if c == 0 {
galMulSlice(matrixRows[iRow][c], in, outputs[iRow][start:end], &r.o)
} else {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow][start:end], &r.o)
}
}
}
start = end
end += r.o.perRound
if end > len(inputs[0]) {
end = len(inputs[0])
}
}
}
@ -484,17 +505,29 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu
if start+do > byteCount {
do = byteCount - start
}
wg.Add(1)
go func(start, stop int) {
for c := 0; c < r.DataShards; c++ {
in := inputs[c][start:stop]
for iRow := 0; iRow < outputCount; iRow++ {
if c == 0 {
galMulSlice(matrixRows[iRow][c], in, outputs[iRow][start:stop], &r.o)
} else {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow][start:stop], &r.o)
lstart, lstop := start, start+r.o.perRound
if lstop > stop {
lstop = stop
}
for lstart < stop {
for c := 0; c < r.DataShards; c++ {
in := inputs[c][lstart:lstop]
for iRow := 0; iRow < outputCount; iRow++ {
if c == 0 {
galMulSlice(matrixRows[iRow][c], in, outputs[iRow][lstart:lstop], &r.o)
} else {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow][lstart:lstop], &r.o)
}
}
}
lstart = lstop
lstop += r.o.perRound
if lstop > stop {
lstop = stop
}
}
wg.Done()
}(start, start+do)

View File

@ -1436,3 +1436,50 @@ func benchmarkSplit(b *testing.B, shards, parity, dataSize int) {
}
}
}
func benchmarkParallel(b *testing.B, dataShards, parityShards, shardSize int) {
// Run max 1 goroutine per operation.
r, err := New(dataShards, parityShards, testOptions(WithMaxGoroutines(1))...)
if err != nil {
b.Fatal(err)
}
c := runtime.GOMAXPROCS(0)
// Note that concurrency also affects total data size and will make caches less effective.
b.Log("Total data:", (c*dataShards*shardSize)>>20, "MiB", "parity:", (c*parityShards*shardSize)>>20, "MiB")
// Create independent shards
shardsCh := make(chan [][]byte, c)
for i := 0; i < c; i++ {
rand.Seed(int64(i))
shards := make([][]byte, dataShards+parityShards)
for s := range shards {
shards[s] = make([]byte, shardSize)
}
for s := 0; s < dataShards; s++ {
fillRandom(shards[s])
}
shardsCh <- shards
}
b.SetBytes(int64(shardSize * dataShards))
b.SetParallelism(c)
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
shards := <-shardsCh
err = r.Encode(shards)
if err != nil {
b.Fatal(err)
}
shardsCh <- shards
}
})
}
func BenchmarkParallel_8x8x05M(b *testing.B) { benchmarkParallel(b, 8, 8, 512<<10) }
func BenchmarkParallel_20x10x05M(b *testing.B) { benchmarkParallel(b, 20, 10, 512<<10) }
func BenchmarkParallel_8x8x1M(b *testing.B) { benchmarkParallel(b, 8, 8, 1<<20) }
func BenchmarkParallel_8x8x8M(b *testing.B) { benchmarkParallel(b, 8, 8, 8<<20) }
func BenchmarkParallel_8x8x32M(b *testing.B) { benchmarkParallel(b, 8, 8, 32<<20) }