Add options (#46)

* Add options

Make constants changeable as options.

The API remains backwards compatible.

* Update documentation.

* Fix line endings

* fmt

* fmt

* Use functions for parameters.

Much neater.
master
Klaus Post 2017-02-19 11:13:22 +01:00 committed by GitHub
parent c056598956
commit 5abf0ee302
9 changed files with 210 additions and 46 deletions

View File

@ -7,11 +7,11 @@ os:
- osx - osx
go: go:
- 1.3
- 1.4
- 1.5 - 1.5
- 1.6 - 1.6
- tip - 1.7
- 1.8
- master
install: install:
- go get ./... - go get ./...
@ -26,3 +26,8 @@ script:
- go build examples/stream-decoder.go - go build examples/stream-decoder.go
- go build examples/stream-encoder.go - go build examples/stream-encoder.go
- diff <(gofmt -d .) <("") - diff <(gofmt -d .) <("")
matrix:
allow_failures:
- go: 'master'
fast_finish: true

View File

@ -162,6 +162,18 @@ There is no buffering or timeouts/retry specified. If you want to add that, you
For complete examples of a streaming encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples). For complete examples of a streaming encoder and decoder see the [examples folder](https://github.com/klauspost/reedsolomon/tree/master/examples).
#Advanced Options
You can modify internal options which affects how jobs are split between and processed by goroutines.
To create options, use the WithXXX functions. You can supply options to `New`, `NewStream` and `NewStreamC`. If no Options are supplied, default options are used.
Example of how to supply options:
```Go
enc, err := reedsolomon.New(10, 3, WithMaxGoroutines(25))
```
# Performance # Performance
Performance depends mainly on the number of parity shards. In rough terms, doubling the number of parity shards will double the encoding time. Performance depends mainly on the number of parity shards. In rough terms, doubling the number of parity shards will double the encoding time.

View File

@ -5,10 +5,6 @@
package reedsolomon package reedsolomon
import (
"github.com/klauspost/cpuid"
)
//go:noescape //go:noescape
func galMulSSSE3(low, high, in, out []byte) func galMulSSSE3(low, high, in, out []byte)
@ -40,12 +36,12 @@ func galMulSSSE3Xor(low, high, in, out []byte) {
} }
*/ */
func galMulSlice(c byte, in, out []byte) { func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) {
var done int var done int
if cpuid.CPU.AVX2() { if avx2 {
galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out) galMulAVX2(mulTableLow[c][:], mulTableHigh[c][:], in, out)
done = (len(in) >> 5) << 5 done = (len(in) >> 5) << 5
} else if cpuid.CPU.SSSE3() { } else if ssse3 {
galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out) galMulSSSE3(mulTableLow[c][:], mulTableHigh[c][:], in, out)
done = (len(in) >> 4) << 4 done = (len(in) >> 4) << 4
} }
@ -58,12 +54,12 @@ func galMulSlice(c byte, in, out []byte) {
} }
} }
func galMulSliceXor(c byte, in, out []byte) { func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) {
var done int var done int
if cpuid.CPU.AVX2() { if avx2 {
galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) galMulAVX2Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)
done = (len(in) >> 5) << 5 done = (len(in) >> 5) << 5
} else if cpuid.CPU.SSSE3() { } else if ssse3 {
galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out) galMulSSSE3Xor(mulTableLow[c][:], mulTableHigh[c][:], in, out)
done = (len(in) >> 4) << 4 done = (len(in) >> 4) << 4
} }

View File

@ -4,14 +4,14 @@
package reedsolomon package reedsolomon
func galMulSlice(c byte, in, out []byte) { func galMulSlice(c byte, in, out []byte, ssse3, avx2 bool) {
mt := mulTable[c] mt := mulTable[c]
for n, input := range in { for n, input := range in {
out[n] = mt[input] out[n] = mt[input]
} }
} }
func galMulSliceXor(c byte, in, out []byte) { func galMulSliceXor(c byte, in, out []byte, ssse3, avx2 bool) {
mt := mulTable[c] mt := mulTable[c]
for n, input := range in { for n, input := range in {
out[n] ^= mt[input] out[n] ^= mt[input]

View File

@ -131,13 +131,13 @@ func TestGalois(t *testing.T) {
// Test slices (>16 entries to test assembler) // Test slices (>16 entries to test assembler)
in := []byte{0, 1, 2, 3, 4, 5, 6, 10, 50, 100, 150, 174, 201, 255, 99, 32, 67, 85} in := []byte{0, 1, 2, 3, 4, 5, 6, 10, 50, 100, 150, 174, 201, 255, 99, 32, 67, 85}
out := make([]byte, len(in)) out := make([]byte, len(in))
galMulSlice(25, in, out) galMulSlice(25, in, out, false, false)
expect := []byte{0x0, 0x19, 0x32, 0x2b, 0x64, 0x7d, 0x56, 0xfa, 0xb8, 0x6d, 0xc7, 0x85, 0xc3, 0x1f, 0x22, 0x7, 0x25, 0xfe} expect := []byte{0x0, 0x19, 0x32, 0x2b, 0x64, 0x7d, 0x56, 0xfa, 0xb8, 0x6d, 0xc7, 0x85, 0xc3, 0x1f, 0x22, 0x7, 0x25, 0xfe}
if 0 != bytes.Compare(out, expect) { if 0 != bytes.Compare(out, expect) {
t.Errorf("got %#v, expected %#v", out, expect) t.Errorf("got %#v, expected %#v", out, expect)
} }
galMulSlice(177, in, out) galMulSlice(177, in, out, false, false)
expect = []byte{0x0, 0xb1, 0x7f, 0xce, 0xfe, 0x4f, 0x81, 0x9e, 0x3, 0x6, 0xe8, 0x75, 0xbd, 0x40, 0x36, 0xa3, 0x95, 0xcb} expect = []byte{0x0, 0xb1, 0x7f, 0xce, 0xfe, 0x4f, 0x81, 0x9e, 0x3, 0x6, 0xe8, 0x75, 0xbd, 0x40, 0x36, 0xa3, 0x95, 0xcb}
if 0 != bytes.Compare(out, expect) { if 0 != bytes.Compare(out, expect) {
t.Errorf("got %#v, expected %#v", out, expect) t.Errorf("got %#v, expected %#v", out, expect)

67
options.go Normal file
View File

@ -0,0 +1,67 @@
package reedsolomon
import (
"runtime"
"github.com/klauspost/cpuid"
)
// Option allows to override processing parameters.
type Option func(*options)
type options struct {
maxGoroutines int
minSplitSize int
useAVX2, useSSSE3 bool
}
var defaultOptions = options{
maxGoroutines: 50,
minSplitSize: 512,
}
func init() {
if runtime.GOMAXPROCS(0) <= 1 {
defaultOptions.maxGoroutines = 1
}
// Detect CPU capabilities.
defaultOptions.useSSSE3 = cpuid.CPU.SSSE3()
defaultOptions.useAVX2 = cpuid.CPU.AVX2()
}
// WithMaxGoroutines is the maximum number of goroutines number for encoding & decoding.
// Jobs will be split into this many parts, unless each goroutine would have to process
// less than minSplitSize bytes (set with WithMinSplitSize).
// For the best speed, keep this well above the GOMAXPROCS number for more fine grained
// scheduling.
// If n <= 0, it is ignored.
func WithMaxGoroutines(n int) Option {
return func(o *options) {
if n > 0 {
o.maxGoroutines = n
}
}
}
// MinSplitSize Is the minimum encoding size in bytes per goroutine.
// See WithMaxGoroutines on how jobs are split.
// If n <= 0, it is ignored.
func WithMinSplitSize(n int) Option {
return func(o *options) {
if n > 0 {
o.maxGoroutines = n
}
}
}
func withSSE3(enabled bool) Option {
return func(o *options) {
o.useSSSE3 = enabled
}
}
func withAVX2(enabled bool) Option {
return func(o *options) {
o.useAVX2 = enabled
}
}

View File

@ -15,7 +15,6 @@ import (
"bytes" "bytes"
"errors" "errors"
"io" "io"
"runtime"
"sync" "sync"
) )
@ -83,6 +82,7 @@ type reedSolomon struct {
m matrix m matrix
tree inversionTree tree inversionTree
parity [][]byte parity [][]byte
o options
} }
// ErrInvShardNum will be returned by New, if you attempt to create // ErrInvShardNum will be returned by New, if you attempt to create
@ -98,13 +98,18 @@ var ErrMaxShardNum = errors.New("cannot create Encoder with 255 or more data+par
// the number of data shards and parity shards that // the number of data shards and parity shards that
// you want to use. You can reuse this encoder. // you want to use. You can reuse this encoder.
// Note that the maximum number of data shards is 256. // Note that the maximum number of data shards is 256.
func New(dataShards, parityShards int) (Encoder, error) { // If no options are supplied, default options are used.
func New(dataShards, parityShards int, opts ...Option) (Encoder, error) {
r := reedSolomon{ r := reedSolomon{
DataShards: dataShards, DataShards: dataShards,
ParityShards: parityShards, ParityShards: parityShards,
Shards: dataShards + parityShards, Shards: dataShards + parityShards,
o: defaultOptions,
} }
for _, opt := range opts {
opt(&r.o)
}
if dataShards <= 0 || parityShards <= 0 { if dataShards <= 0 || parityShards <= 0 {
return nil, ErrInvShardNum return nil, ErrInvShardNum
} }
@ -201,7 +206,7 @@ func (r reedSolomon) Verify(shards [][]byte) (bool, error) {
// number of matrix rows used, is determined by // number of matrix rows used, is determined by
// outputCount, which is the number of outputs to compute. // outputCount, which is the number of outputs to compute.
func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) {
if runtime.GOMAXPROCS(0) > 1 && len(inputs[0]) > minSplitSize { if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize {
r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount) r.codeSomeShardsP(matrixRows, inputs, outputs, outputCount, byteCount)
return return
} }
@ -209,26 +214,21 @@ func (r reedSolomon) codeSomeShards(matrixRows, inputs, outputs [][]byte, output
in := inputs[c] in := inputs[c]
for iRow := 0; iRow < outputCount; iRow++ { for iRow := 0; iRow < outputCount; iRow++ {
if c == 0 { if c == 0 {
galMulSlice(matrixRows[iRow][c], in, outputs[iRow]) galMulSlice(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2)
} else { } else {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2)
} }
} }
} }
} }
const (
minSplitSize = 512 // min split size per goroutine
maxGoroutines = 50 // max goroutines number for encoding & decoding
)
// Perform the same as codeSomeShards, but split the workload into // Perform the same as codeSomeShards, but split the workload into
// several goroutines. // several goroutines.
func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) { func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outputCount, byteCount int) {
var wg sync.WaitGroup var wg sync.WaitGroup
do := byteCount / maxGoroutines do := byteCount / r.o.maxGoroutines
if do < minSplitSize { if do < r.o.minSplitSize {
do = minSplitSize do = r.o.minSplitSize
} }
start := 0 start := 0
for start < byteCount { for start < byteCount {
@ -241,9 +241,9 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu
in := inputs[c] in := inputs[c]
for iRow := 0; iRow < outputCount; iRow++ { for iRow := 0; iRow < outputCount; iRow++ {
if c == 0 { if c == 0 {
galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) galMulSlice(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2)
} else { } else {
galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop]) galMulSliceXor(matrixRows[iRow][c], in[start:stop], outputs[iRow][start:stop], r.o.useSSSE3, r.o.useAVX2)
} }
} }
} }
@ -258,13 +258,36 @@ func (r reedSolomon) codeSomeShardsP(matrixRows, inputs, outputs [][]byte, outpu
// except this will check values and return // except this will check values and return
// as soon as a difference is found. // as soon as a difference is found.
func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool { func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool {
if r.o.maxGoroutines > 1 && byteCount > r.o.minSplitSize {
return r.checkSomeShardsP(matrixRows, inputs, toCheck, outputCount, byteCount)
}
outputs := make([][]byte, len(toCheck))
for i := range outputs {
outputs[i] = make([]byte, byteCount)
}
for c := 0; c < r.DataShards; c++ {
in := inputs[c]
for iRow := 0; iRow < outputCount; iRow++ {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2)
}
}
for i, calc := range outputs {
if !bytes.Equal(calc, toCheck[i]) {
return false
}
}
return true
}
func (r reedSolomon) checkSomeShardsP(matrixRows, inputs, toCheck [][]byte, outputCount, byteCount int) bool {
same := true same := true
var mu sync.RWMutex // For above var mu sync.RWMutex // For above
var wg sync.WaitGroup var wg sync.WaitGroup
do := byteCount / maxGoroutines do := byteCount / r.o.maxGoroutines
if do < minSplitSize { if do < r.o.minSplitSize {
do = minSplitSize do = r.o.minSplitSize
} }
start := 0 start := 0
for start < byteCount { for start < byteCount {
@ -287,7 +310,7 @@ func (r reedSolomon) checkSomeShards(matrixRows, inputs, toCheck [][]byte, outpu
mu.RUnlock() mu.RUnlock()
in := inputs[c][start : start+do] in := inputs[c][start : start+do]
for iRow := 0; iRow < outputCount; iRow++ { for iRow := 0; iRow < outputCount; iRow++ {
galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow]) galMulSliceXor(matrixRows[iRow][c], in, outputs[iRow], r.o.useSSSE3, r.o.useAVX2)
} }
} }

View File

@ -14,9 +14,43 @@ import (
"testing" "testing"
) )
func testOpts() [][]Option {
if !testing.Short() {
return [][]Option{}
}
opts := [][]Option{
{WithMaxGoroutines(1), WithMinSplitSize(500), withSSE3(false), withAVX2(false)},
{WithMaxGoroutines(5000), WithMinSplitSize(50), withSSE3(false), withAVX2(false)},
{WithMaxGoroutines(5000), WithMinSplitSize(500000), withSSE3(false), withAVX2(false)},
{WithMaxGoroutines(1), WithMinSplitSize(500000), withSSE3(false), withAVX2(false)},
}
for _, o := range opts[:] {
if defaultOptions.useSSSE3 {
n := make([]Option, len(o), len(o)+1)
copy(n, o)
n = append(n, withSSE3(true))
opts = append(opts, n)
}
if defaultOptions.useAVX2 {
n := make([]Option, len(o), len(o)+1)
copy(n, o)
n = append(n, withAVX2(true))
opts = append(opts, n)
}
}
return opts
}
func TestEncoding(t *testing.T) { func TestEncoding(t *testing.T) {
testEncoding(t)
for _, o := range testOpts() {
testEncoding(t, o...)
}
}
func testEncoding(t *testing.T, o ...Option) {
perShard := 50000 perShard := 50000
r, err := New(10, 3) r, err := New(10, 3, o...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -56,8 +90,15 @@ func TestEncoding(t *testing.T) {
} }
func TestReconstruct(t *testing.T) { func TestReconstruct(t *testing.T) {
testReconstruct(t)
for _, o := range testOpts() {
testReconstruct(t, o...)
}
}
func testReconstruct(t *testing.T, o ...Option) {
perShard := 50000 perShard := 50000
r, err := New(10, 3) r, err := New(10, 3, o...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -122,8 +163,15 @@ func TestReconstruct(t *testing.T) {
} }
func TestVerify(t *testing.T) { func TestVerify(t *testing.T) {
testVerify(t)
for _, o := range testOpts() {
testVerify(t, o...)
}
}
func testVerify(t *testing.T, o ...Option) {
perShard := 33333 perShard := 33333
r, err := New(10, 4) r, err := New(10, 4, o...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -536,14 +584,27 @@ func BenchmarkReconstructP10x4x16M(b *testing.B) {
} }
func TestEncoderReconstruct(t *testing.T) { func TestEncoderReconstruct(t *testing.T) {
testEncoderReconstruct(t)
for _, o := range testOpts() {
testEncoderReconstruct(t, o...)
}
}
func testEncoderReconstruct(t *testing.T, o ...Option) {
// Create some sample data // Create some sample data
var data = make([]byte, 250000) var data = make([]byte, 250000)
fillRandom(data) fillRandom(data)
// Create 5 data slices of 50000 elements each // Create 5 data slices of 50000 elements each
enc, _ := New(5, 3) enc, err := New(5, 3, o...)
shards, _ := enc.Split(data) if err != nil {
err := enc.Encode(shards) t.Fatal(err)
}
shards, err := enc.Split(data)
if err != nil {
t.Fatal(err)
}
err = enc.Encode(shards)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -145,8 +145,8 @@ type rsStream struct {
// the number of data shards and parity shards that // the number of data shards and parity shards that
// you want to use. You can reuse this encoder. // you want to use. You can reuse this encoder.
// Note that the maximum number of data shards is 256. // Note that the maximum number of data shards is 256.
func NewStream(dataShards, parityShards int) (StreamEncoder, error) { func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
enc, err := New(dataShards, parityShards) enc, err := New(dataShards, parityShards, o...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -161,8 +161,8 @@ func NewStream(dataShards, parityShards int) (StreamEncoder, error) {
// the number of data shards and parity shards given. // the number of data shards and parity shards given.
// //
// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes. // This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) { func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
enc, err := New(dataShards, parityShards) enc, err := New(dataShards, parityShards, o...)
if err != nil { if err != nil {
return nil, err return nil, err
} }