Fix stream allocations (#129)

Numbers speak for themselves:

```
benchmark                                old ns/op     new ns/op     delta
BenchmarkStreamEncode10x2x10000-32       4792420       7937          -99.83%
BenchmarkStreamEncode100x20x10000-32     38424066      473285        -98.77%
BenchmarkStreamEncode17x3x1M-32          8195036       1482191       -81.91%
BenchmarkStreamEncode10x4x16M-32         21356715      18051773      -15.47%
BenchmarkStreamEncode5x2x1M-32           3295827       412301        -87.49%
BenchmarkStreamEncode10x2x1M-32          5249011       798828        -84.78%
BenchmarkStreamEncode10x4x1M-32          6392974       904818        -85.85%
BenchmarkStreamEncode50x20x1M-32         29083474      7199282       -75.25%
BenchmarkStreamEncode17x3x16M-32         32451850      28036421      -13.61%
BenchmarkStreamVerify10x2x10000-32       4858416       12988         -99.73%
BenchmarkStreamVerify50x5x50000-32       17047361      377003        -97.79%
BenchmarkStreamVerify10x2x1M-32          4869964       887214        -81.78%
BenchmarkStreamVerify5x2x1M-32           3282999       591669        -81.98%
BenchmarkStreamVerify10x4x1M-32          5824392       1230888       -78.87%
BenchmarkStreamVerify50x20x1M-32         27301648      6204613       -77.27%
BenchmarkStreamVerify10x4x16M-32         8508963       18845695      +121.48%

benchmark                                old MB/s     new MB/s     speedup
BenchmarkStreamEncode10x2x10000-32       20.87        12599.82     603.73x
BenchmarkStreamEncode100x20x10000-32     26.03        2112.89      81.17x
BenchmarkStreamEncode17x3x1M-32          2175.19      12026.65     5.53x
BenchmarkStreamEncode10x4x16M-32         7855.71      9293.94      1.18x
BenchmarkStreamEncode5x2x1M-32           1590.76      12716.14     7.99x
BenchmarkStreamEncode10x2x1M-32          1997.66      13126.43     6.57x
BenchmarkStreamEncode10x4x1M-32          1640.20      11588.81     7.07x
BenchmarkStreamEncode50x20x1M-32         1802.70      7282.50      4.04x
BenchmarkStreamEncode17x3x16M-32         8788.80      10172.93     1.16x
BenchmarkStreamVerify10x2x10000-32       20.58        7699.20      374.11x
BenchmarkStreamVerify50x5x50000-32       293.30       13262.49     45.22x
BenchmarkStreamVerify10x2x1M-32          2153.15      11818.75     5.49x
BenchmarkStreamVerify5x2x1M-32           1596.98      8861.17      5.55x
BenchmarkStreamVerify10x4x1M-32          1800.32      8518.86      4.73x
BenchmarkStreamVerify50x20x1M-32         1920.35      8449.97      4.40x
BenchmarkStreamVerify10x4x16M-32         19717.11     8902.41      0.45x
```
master
Klaus Post 2020-05-05 16:35:35 +02:00 committed by GitHub
parent dccac354fe
commit abb309aca7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 34 deletions

View File

@ -17,6 +17,11 @@ type options struct {
useCauchy bool
shardSize int
perRound int
// stream options
concReads bool
concWrites bool
streamBS int
}
var defaultOptions = options{
@ -74,6 +79,43 @@ func WithMinSplitSize(n int) Option {
}
}
// WithConcurrentStreams will enable concurrent reads and writes on the streams.
// Default: Disabled, meaning only one stream will be read/written at the time.
// Ignored if not used on a stream input.
func WithConcurrentStreams(enabled bool) Option {
return func(o *options) {
o.concReads, o.concWrites = enabled, enabled
}
}
// WithConcurrentStreamReads will enable concurrent reads from the input streams.
// Default: Disabled, meaning only one stream will be read at the time.
// Ignored if not used on a stream input.
func WithConcurrentStreamReads(enabled bool) Option {
return func(o *options) {
o.concReads = enabled
}
}
// WithConcurrentStreamWrites will enable concurrent writes to the the output streams.
// Default: Disabled, meaning only one stream will be written at the time.
// Ignored if not used on a stream input.
func WithConcurrentStreamWrites(enabled bool) Option {
return func(o *options) {
o.concWrites = enabled
}
}
// WithStreamBlockSize allows to set a custom block size per round of reads/writes.
// If not set, any shard size set with WithAutoGoroutines will be used.
// If WithAutoGoroutines is also unset, 4MB will be used.
// Ignored if not used on stream.
func WithStreamBlockSize(n int) Option {
return func(o *options) {
o.streamBS = n
}
}
func withSSSE3(enabled bool) Option {
return func(o *options) {
o.useSSSE3 = enabled

View File

@ -131,12 +131,15 @@ func (s StreamWriteError) String() string {
// distribution of datashards and parity shards.
// Construct if using NewStream()
type rsStream struct {
r *reedSolomon
bs int // Block size
r *reedSolomon
o options
// Shard reader
readShards func(dst [][]byte, in []io.Reader) error
// Shard writer
writeShards func(out []io.Writer, in [][]byte) error
blockPool sync.Pool
}
// NewStream creates a new encoder and initializes it to
@ -144,14 +147,43 @@ type rsStream struct {
// you want to use. You can reuse this encoder.
// Note that the maximum number of data shards is 256.
func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
r := rsStream{o: defaultOptions}
for _, opt := range o {
opt(&r.o)
}
// Override block size if shard size is set.
if r.o.streamBS == 0 && r.o.shardSize > 0 {
r.o.streamBS = r.o.shardSize
}
if r.o.streamBS <= 0 {
r.o.streamBS = 4 << 20
}
if r.o.shardSize == 0 && r.o.maxGoroutines == defaultOptions.maxGoroutines {
o = append(o, WithAutoGoroutines(r.o.streamBS))
}
enc, err := New(dataShards, parityShards, o...)
if err != nil {
return nil, err
}
rs := enc.(*reedSolomon)
r := rsStream{r: rs, bs: 4 << 20}
r.r = enc.(*reedSolomon)
r.blockPool.New = func() interface{} {
out := make([][]byte, dataShards+parityShards)
for i := range out {
out[i] = make([]byte, r.o.streamBS)
}
return out
}
r.readShards = readShards
r.writeShards = writeShards
if r.o.concReads {
r.readShards = cReadShards
}
if r.o.concWrites {
r.writeShards = cWriteShards
}
return &r, err
}
@ -160,27 +192,13 @@ func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error)
//
// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
enc, err := New(dataShards, parityShards, o...)
if err != nil {
return nil, err
}
rs := enc.(*reedSolomon)
r := rsStream{r: rs, bs: 4 << 20}
r.readShards = readShards
r.writeShards = writeShards
if conReads {
r.readShards = cReadShards
}
if conWrites {
r.writeShards = cWriteShards
}
return &r, err
return NewStream(dataShards, parityShards, append(o, WithConcurrentStreamReads(conReads), WithConcurrentStreamWrites(conWrites))...)
}
func createSlice(n, length int) [][]byte {
out := make([][]byte, n)
func (r *rsStream) createSlice() [][]byte {
out := r.blockPool.Get().([][]byte)
for i := range out {
out[i] = make([]byte, length)
out[i] = out[i][:r.o.streamBS]
}
return out
}
@ -200,7 +218,7 @@ func createSlice(n, length int) [][]byte {
// If a data stream returns an error, a StreamReadError type error
// will be returned. If a parity writer returns an error, a
// StreamWriteError will be returned.
func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
func (r *rsStream) Encode(data []io.Reader, parity []io.Writer) error {
if len(data) != r.r.DataShards {
return ErrTooFewShards
}
@ -209,7 +227,8 @@ func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
return ErrTooFewShards
}
all := createSlice(r.r.Shards, r.bs)
all := r.createSlice()
defer r.blockPool.Put(all)
in := all[:r.r.DataShards]
out := all[r.r.DataShards:]
read := 0
@ -242,11 +261,11 @@ func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
// Trim the shards so they are all the same size
func trimShards(in [][]byte, size int) [][]byte {
for i := range in {
if in[i] != nil {
if len(in[i]) != 0 {
in[i] = in[i][0:size]
}
if len(in[i]) < size {
in[i] = nil
in[i] = in[i][:0]
}
}
return in
@ -259,7 +278,7 @@ func readShards(dst [][]byte, in []io.Reader) error {
size := -1
for i := range in {
if in[i] == nil {
dst[i] = nil
dst[i] = dst[i][:0]
continue
}
n, err := io.ReadFull(in[i], dst[i])
@ -323,7 +342,7 @@ func cReadShards(dst [][]byte, in []io.Reader) error {
res := make(chan readResult, len(in))
for i := range in {
if in[i] == nil {
dst[i] = nil
dst[i] = dst[i][:0]
wg.Done()
continue
}
@ -405,13 +424,14 @@ func cWriteShards(out []io.Writer, in [][]byte) error {
// Each reader must supply the same number of bytes.
// If a shard stream returns an error, a StreamReadError type error
// will be returned.
func (r rsStream) Verify(shards []io.Reader) (bool, error) {
func (r *rsStream) Verify(shards []io.Reader) (bool, error) {
if len(shards) != r.r.Shards {
return false, ErrTooFewShards
}
read := 0
all := createSlice(r.r.Shards, r.bs)
all := r.createSlice()
defer r.blockPool.Put(all)
for {
err := r.readShards(all, shards)
if err == io.EOF {
@ -451,7 +471,7 @@ var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutual
// The reconstructed shard set is complete when explicitly asked for all missing shards.
// However its integrity is not automatically verified.
// Use the Verify function to check in case the data set is complete.
func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
func (r *rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
if len(valid) != r.r.Shards {
return ErrTooFewShards
}
@ -459,7 +479,8 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
return ErrTooFewShards
}
all := createSlice(r.r.Shards, r.bs)
all := r.createSlice()
defer r.blockPool.Put(all)
reconDataOnly := true
for i := range valid {
if valid[i] != nil && fill[i] != nil {
@ -507,7 +528,7 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
// You must supply the exact output size you want.
// If there are to few shards given, ErrTooFewShards will be returned.
// If the total data size is less than outSize, ErrShortData will be returned.
func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
func (r *rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
// Do we have enough shards?
if len(shards) < r.r.DataShards {
return ErrTooFewShards
@ -546,7 +567,7 @@ func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
// You must supply the total size of your input.
// 'ErrShortData' will be returned if it is unable to retrieve the
// number of bytes indicated.
func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
func (r *rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
if size == 0 {
return ErrShortData
}