Merge pull request #74 from klauspost/align-blocks

Split blocks into size divisible by 16, add WithAutoGoroutines
master
Klaus Post 2017-12-10 21:00:05 +01:00 committed by GitHub
commit 6db5e38e85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 112 additions and 8 deletions

View File

@ -24,6 +24,10 @@ go get -u github.com/klauspost/reedsolomon
# Changes
## November 18, 2017
Added [WithAutoGoroutines](https://godoc.org/github.com/klauspost/reedsolomon#WithAutoGoroutines) which will attempt to calculate the optimal number og goroutines to use based on your expected shard size and detected CPU.
## October 1, 2017
* [Cauchy Matrix](https://godoc.org/github.com/klauspost/reedsolomon#WithCauchyMatrix) is now an option. Thanks to [templexxx](https://github.com/templexxx) for the basis of this.

View File

@ -19,8 +19,35 @@ TEXT ·galMulSSSE3Xor(SB), 7, $0
MOVQ out+72(FP), DX // DX: &out
PSHUFB X5, X8 // X8: lomask (unpacked)
SHRQ $4, R9 // len(in) / 16
MOVQ SI, AX
MOVQ DX, BX
ANDQ $15, AX
ANDQ $15, BX
CMPQ R9, $0
JEQ done_xor
ORQ AX, BX
CMPQ BX, $0
JNZ loopback_xor
loopback_xor_aligned:
MOVOA (SI), X0 // in[x]
MOVOA (DX), X4 // out[x]
MOVOA X0, X1 // in[x]
MOVOA X6, X2 // low copy
MOVOA X7, X3 // high copy
PSRLQ $4, X1 // X1: high input
PAND X8, X0 // X0: low input
PAND X8, X1 // X0: high input
PSHUFB X0, X2 // X2: mul low part
PSHUFB X1, X3 // X3: mul high part
PXOR X2, X3 // X3: Result
PXOR X4, X3 // X3: Result xor existing out
MOVOA X3, (DX) // Store
ADDQ $16, SI // in+=16
ADDQ $16, DX // out+=16
SUBQ $1, R9
JNZ loopback_xor_aligned
JMP done_xor
loopback_xor:
MOVOU (SI), X0 // in[x]
@ -57,15 +84,40 @@ TEXT ·galMulSSSE3(SB), 7, $0
MOVQ in_len+56(FP), R9 // R9: len(in)
MOVQ out+72(FP), DX // DX: &out
PSHUFB X5, X8 // X8: lomask (unpacked)
MOVQ SI, AX
MOVQ DX, BX
SHRQ $4, R9 // len(in) / 16
ANDQ $15, AX
ANDQ $15, BX
CMPQ R9, $0
JEQ done
ORQ AX, BX
CMPQ BX, $0
JNZ loopback
loopback_aligned:
MOVOA (SI), X0 // in[x]
MOVOA X0, X1 // in[x]
MOVOA X6, X2 // low copy
MOVOA X7, X3 // high copy
PSRLQ $4, X1 // X1: high input
PAND X8, X0 // X0: low input
PAND X8, X1 // X0: high input
PSHUFB X0, X2 // X2: mul low part
PSHUFB X1, X3 // X3: mul high part
PXOR X2, X3 // X3: Result
MOVOA X3, (DX) // Store
ADDQ $16, SI // in+=16
ADDQ $16, DX // out+=16
SUBQ $1, R9
JNZ loopback_aligned
JMP done
loopback:
MOVOU (SI), X0 // in[x]
MOVOU X0, X1 // in[x]
MOVOU X6, X2 // low copy
MOVOU X7, X3 // high copy
MOVOA X6, X2 // low copy
MOVOA X7, X3 // high copy
PSRLQ $4, X1 // X1: high input
PAND X8, X0 // X0: low input
PAND X8, X1 // X0: high input

View File

@ -15,11 +15,12 @@ type options struct {
useAVX2, useSSSE3, useSSE2 bool
usePAR1Matrix bool
useCauchy bool
shardSize int
}
var defaultOptions = options{
maxGoroutines: 384,
minSplitSize: 512,
minSplitSize: 1024,
}
func init() {
@ -46,6 +47,18 @@ func WithMaxGoroutines(n int) Option {
}
}
// WithAutoGoroutines will adjust the number of goroutines for optimal speed with a
// specific shard size.
// Send in the shard size you expect to send. Other shard sizes will work, but may not
// run at the optimal speed.
// Overwrites WithMaxGoroutines.
// If shardSize <= 0, it is ignored.
func WithAutoGoroutines(shardSize int) Option {
return func(o *options) {
o.shardSize = shardSize
}
}
// WithMinSplitSize is the minimum encoding size in bytes per goroutine.
// See WithMaxGoroutines on how jobs are split.
// If n <= 0, it is ignored.

View File

@ -15,7 +15,10 @@ import (
"bytes"
"errors"
"io"
"runtime"
"sync"
"github.com/klauspost/cpuid"
)
// Encoder is an interface to encode Reed-Salomon parity sets for your data.
@ -239,6 +242,33 @@ func New(dataShards, parityShards int, opts ...Option) (Encoder, error) {
if err != nil {
return nil, err
}
if r.o.shardSize > 0 {
cacheSize := cpuid.CPU.Cache.L2
if cacheSize <= 0 {
// Set to 128K if undetectable.
cacheSize = 128 << 10
}
p := runtime.NumCPU()
// 1 input + parity must fit in cache, and we add one more to be safer.
shards := 1 + parityShards
g := (r.o.shardSize * shards) / (cacheSize - (cacheSize >> 4))
if cpuid.CPU.ThreadsPerCore > 1 {
// If multiple threads per core, make sure they don't contend for cache.
g *= cpuid.CPU.ThreadsPerCore
}
g *= 2
if g < p {
g = p
}
// Have g be multiple of p
g += p - 1
g -= g % p
r.o.maxGoroutines = g
}
// Inverted matrices are cached in a tree keyed by the indices
// of the invalid rows of the data to reconstruct.
@ -431,6 +461,8 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu
if do < r.o.minSplitSize {
do = r.o.minSplitSize
}
// Make sizes divisible by 16
do = (do + 15) & (^15)
start := 0
for start < byteCount {
if start+do > byteCount {
@ -490,6 +522,8 @@ func (r reedSolomon) checkSomeShardsP(matrixRows, inputs, toCheck [][]byte, outp
if do < r.o.minSplitSize {
do = r.o.minSplitSize
}
// Make sizes divisible by 16
do = (do + 15) & (^15)
start := 0
for start < byteCount {
if start+do > byteCount {

View File

@ -113,6 +113,7 @@ func testOpts() [][]Option {
{WithMaxGoroutines(5000), WithMinSplitSize(50), withSSE3(false), withAVX2(false)},
{WithMaxGoroutines(5000), WithMinSplitSize(500000), withSSE3(false), withAVX2(false)},
{WithMaxGoroutines(1), WithMinSplitSize(500000), withSSE3(false), withAVX2(false)},
{WithAutoGoroutines(50000), WithMinSplitSize(500)},
}
for _, o := range opts[:] {
if defaultOptions.useSSSE3 {
@ -655,7 +656,7 @@ func fillRandom(p []byte) {
}
func benchmarkEncode(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := New(dataShards, parityShards)
r, err := New(dataShards, parityShards, WithAutoGoroutines(shardSize))
if err != nil {
b.Fatal(err)
}
@ -722,7 +723,7 @@ func BenchmarkEncode17x3x16M(b *testing.B) {
}
func benchmarkVerify(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := New(dataShards, parityShards)
r, err := New(dataShards, parityShards, WithAutoGoroutines(shardSize))
if err != nil {
b.Fatal(err)
}
@ -793,7 +794,7 @@ func corruptRandom(shards [][]byte, dataShards, parityShards int) {
}
func benchmarkReconstruct(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := New(dataShards, parityShards)
r, err := New(dataShards, parityShards, WithAutoGoroutines(shardSize))
if err != nil {
b.Fatal(err)
}
@ -873,7 +874,7 @@ func corruptRandomData(shards [][]byte, dataShards, parityShards int) {
}
func benchmarkReconstructData(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := New(dataShards, parityShards)
r, err := New(dataShards, parityShards, WithAutoGoroutines(shardSize))
if err != nil {
b.Fatal(err)
}
@ -939,7 +940,7 @@ func BenchmarkReconstructData10x4x16M(b *testing.B) {
}
func benchmarkReconstructP(b *testing.B, dataShards, parityShards, shardSize int) {
r, err := New(dataShards, parityShards)
r, err := New(dataShards, parityShards, WithAutoGoroutines(shardSize))
if err != nil {
b.Fatal(err)
}