reedsolomon-go/streaming.go

608 lines
16 KiB
Go
Raw Normal View History

2015-10-23 17:14:55 +03:00
/**
* Reed-Solomon Coding over 8-bit values.
*
* Copyright 2015, Klaus Post
* Copyright 2015, Backblaze, Inc.
*/
package reedsolomon
import (
"bytes"
"errors"
"fmt"
2015-10-23 17:14:55 +03:00
"io"
"sync"
2015-10-23 17:14:55 +03:00
)
// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
// It provides a fully streaming interface, and processes data in blocks of up to 4MB.
2015-10-27 16:54:40 +03:00
//
// For small shard sizes, 10MB and below, it is recommended to use the in-memory interface,
// since the streaming interface has a start up overhead.
2015-11-03 14:10:37 +03:00
//
// For all operations, no readers and writers should not assume any order/size of
// individual reads/writes.
//
// For usage examples, see "stream-encoder.go" and "streamdecoder.go" in the examples
// folder.
2015-10-23 17:14:55 +03:00
type StreamEncoder interface {
// Encode parity shards for a set of data shards.
2015-10-27 17:59:47 +03:00
//
2015-10-23 17:14:55 +03:00
// Input is 'shards' containing readers for data shards followed by parity shards
// io.Writer.
2015-10-27 17:59:47 +03:00
//
// The number of shards must match the number given to NewStream().
//
// Each reader must supply the same number of bytes.
//
// The parity shards will be written to the writer.
// The number of bytes written will match the input size.
//
// If a data stream returns an error, a StreamReadError type error
// will be returned. If a parity writer returns an error, a
// StreamWriteError will be returned.
2015-10-23 17:14:55 +03:00
Encode(data []io.Reader, parity []io.Writer) error
// Verify returns true if the parity shards contain correct data.
2015-10-27 17:59:47 +03:00
//
// The number of shards must match the number total data+parity shards
// given to NewStream().
//
// Each reader must supply the same number of bytes.
// If a shard stream returns an error, a StreamReadError type error
// will be returned.
2015-10-23 17:14:55 +03:00
Verify(shards []io.Reader) (bool, error)
// Reconstruct will recreate the missing shards if possible.
//
// Given a list of valid shards (to read) and invalid shards (to write)
//
2015-10-27 17:59:47 +03:00
// You indicate that a shard is missing by setting it to nil in the 'valid'
// slice and at the same time setting a non-nil writer in "fill".
// An index cannot contain both non-nil 'valid' and 'fill' entry.
// If both are provided 'ErrReconstructMismatch' is returned.
2015-10-23 17:14:55 +03:00
//
// If there are too few shards to reconstruct the missing
// ones, ErrTooFewShards will be returned.
//
// The reconstructed shard set is complete, but integrity is not verified.
// Use the Verify function to check if data set is ok.
Reconstruct(valid []io.Reader, fill []io.Writer) error
2015-10-27 17:59:47 +03:00
// Split a an input stream into the number of shards given to the encoder.
2015-10-23 17:14:55 +03:00
//
// The data will be split into equally sized shards.
// If the data size isn't dividable by the number of shards,
// the last shard will contain extra zeros.
//
2015-10-27 17:59:47 +03:00
// You must supply the total size of your input.
// 'ErrShortData' will be returned if it is unable to retrieve the
// number of bytes indicated.
2015-10-23 17:14:55 +03:00
Split(data io.Reader, dst []io.Writer, size int64) (err error)
// Join the shards and write the data segment to dst.
//
// Only the data shards are considered.
//
// You must supply the exact output size you want.
// If there are to few shards given, ErrTooFewShards will be returned.
// If the total data size is less than outSize, ErrShortData will be returned.
Join(dst io.Writer, shards []io.Reader, outSize int64) error
}
// StreamReadError is returned when a read error is encountered
2015-10-27 17:59:47 +03:00
// that relates to a supplied stream.
// This will allow you to find out which reader has failed.
2015-10-27 13:11:50 +03:00
type StreamReadError struct {
Err error // The error
Stream int // The stream number on which the error occurred
}
// Error returns the error as a string
2015-10-27 13:11:50 +03:00
func (s StreamReadError) Error() string {
return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
}
// String returns the error as a string
2015-10-27 13:11:50 +03:00
func (s StreamReadError) String() string {
return s.Error()
}
// StreamWriteError is returned when a write error is encountered
// that relates to a supplied stream. This will allow you to
// find out which reader has failed.
type StreamWriteError struct {
Err error // The error
Stream int // The stream number on which the error occurred
}
// Error returns the error as a string
func (s StreamWriteError) Error() string {
return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
}
// String returns the error as a string
func (s StreamWriteError) String() string {
return s.Error()
}
2015-10-27 17:59:47 +03:00
// rsStream contains a matrix for a specific
2015-10-23 17:14:55 +03:00
// distribution of datashards and parity shards.
2015-10-27 17:59:47 +03:00
// Construct if using NewStream()
2015-10-23 17:14:55 +03:00
type rsStream struct {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
r *reedSolomon
o options
// Shard reader
readShards func(dst [][]byte, in []io.Reader) error
// Shard writer
writeShards func(out []io.Writer, in [][]byte) error
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
blockPool sync.Pool
2015-10-23 17:14:55 +03:00
}
// NewStream creates a new encoder and initializes it to
2015-10-23 17:14:55 +03:00
// the number of data shards and parity shards that
// you want to use. You can reuse this encoder.
// Note that the maximum number of data shards is 256.
func NewStream(dataShards, parityShards int, o ...Option) (StreamEncoder, error) {
if dataShards+parityShards > 256 {
return nil, ErrMaxShardNum
}
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
r := rsStream{o: defaultOptions}
for _, opt := range o {
opt(&r.o)
}
// Override block size if shard size is set.
if r.o.streamBS == 0 && r.o.shardSize > 0 {
r.o.streamBS = r.o.shardSize
}
if r.o.streamBS <= 0 {
r.o.streamBS = 4 << 20
}
if r.o.shardSize == 0 && r.o.maxGoroutines == defaultOptions.maxGoroutines {
o = append(o, WithAutoGoroutines(r.o.streamBS))
}
enc, err := New(dataShards, parityShards, o...)
2015-10-23 17:14:55 +03:00
if err != nil {
return nil, err
}
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
r.r = enc.(*reedSolomon)
r.blockPool.New = func() interface{} {
out := make([][]byte, dataShards+parityShards)
for i := range out {
out[i] = make([]byte, r.o.streamBS)
}
return out
}
r.readShards = readShards
r.writeShards = writeShards
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
if r.o.concReads {
r.readShards = cReadShards
}
if r.o.concWrites {
r.writeShards = cWriteShards
}
return &r, err
}
// NewStreamC creates a new encoder and initializes it to
2015-10-27 17:59:47 +03:00
// the number of data shards and parity shards given.
//
// This functions as 'NewStream', but allows you to enable CONCURRENT reads and writes.
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool, o ...Option) (StreamEncoder, error) {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
return NewStream(dataShards, parityShards, append(o, WithConcurrentStreamReads(conReads), WithConcurrentStreamWrites(conWrites))...)
2015-10-23 17:14:55 +03:00
}
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
func (r *rsStream) createSlice() [][]byte {
out := r.blockPool.Get().([][]byte)
2015-10-23 17:14:55 +03:00
for i := range out {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
out[i] = out[i][:r.o.streamBS]
2015-10-23 17:14:55 +03:00
}
return out
}
2015-10-27 17:59:47 +03:00
// Encodes parity shards for a set of data shards.
//
// Input is 'shards' containing readers for data shards followed by parity shards
// io.Writer.
//
// The number of shards must match the number given to NewStream().
//
// Each reader must supply the same number of bytes.
//
// The parity shards will be written to the writer.
// The number of bytes written will match the input size.
//
// If a data stream returns an error, a StreamReadError type error
// will be returned. If a parity writer returns an error, a
// StreamWriteError will be returned.
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
func (r *rsStream) Encode(data []io.Reader, parity []io.Writer) error {
2015-10-23 17:14:55 +03:00
if len(data) != r.r.DataShards {
return ErrTooFewShards
}
if len(parity) != r.r.ParityShards {
return ErrTooFewShards
}
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
all := r.createSlice()
defer r.blockPool.Put(all)
2015-10-23 17:14:55 +03:00
in := all[:r.r.DataShards]
out := all[r.r.DataShards:]
read := 0
2015-10-23 17:14:55 +03:00
for {
err := r.readShards(in, data)
2015-10-23 17:14:55 +03:00
switch err {
case nil:
case io.EOF:
if read == 0 {
return ErrShardNoData
}
2015-10-23 17:14:55 +03:00
return nil
default:
return err
}
out = trimShards(out, shardSize(in))
read += shardSize(in)
2015-10-23 17:14:55 +03:00
err = r.r.Encode(all)
if err != nil {
return err
}
err = r.writeShards(parity, out)
2015-10-23 17:14:55 +03:00
if err != nil {
return err
}
}
}
// Trim the shards so they are all the same size
func trimShards(in [][]byte, size int) [][]byte {
for i := range in {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
if len(in[i]) != 0 {
in[i] = in[i][0:size]
}
if len(in[i]) < size {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
in[i] = in[i][:0]
}
}
return in
}
2015-10-23 17:14:55 +03:00
func readShards(dst [][]byte, in []io.Reader) error {
if len(in) != len(dst) {
2017-07-16 18:00:58 +03:00
panic("internal error: in and dst size do not match")
2015-10-23 17:14:55 +03:00
}
size := -1
for i := range in {
if in[i] == nil {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
dst[i] = dst[i][:0]
2015-10-23 17:14:55 +03:00
continue
}
n, err := io.ReadFull(in[i], dst[i])
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
switch err {
case io.ErrUnexpectedEOF, io.EOF:
if size < 0 {
size = n
} else if n != size {
// Shard sizes must match.
return ErrShardSize
2015-10-23 17:14:55 +03:00
}
dst[i] = dst[i][0:n]
case nil:
continue
2015-10-23 17:14:55 +03:00
default:
2015-10-27 13:11:50 +03:00
return StreamReadError{Err: err, Stream: i}
2015-10-23 17:14:55 +03:00
}
}
if size == 0 {
return io.EOF
}
return nil
}
func writeShards(out []io.Writer, in [][]byte) error {
if len(out) != len(in) {
2017-07-16 18:00:58 +03:00
panic("internal error: in and out size do not match")
2015-10-23 17:14:55 +03:00
}
for i := range in {
if out[i] == nil {
continue
}
n, err := out[i].Write(in[i])
if err != nil {
2015-10-27 13:11:50 +03:00
return StreamWriteError{Err: err, Stream: i}
2015-10-23 17:14:55 +03:00
}
//
if n != len(in[i]) {
2015-10-27 13:11:50 +03:00
return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
2015-10-23 17:14:55 +03:00
}
}
return nil
}
type readResult struct {
n int
size int
err error
}
// cReadShards reads shards concurrently
func cReadShards(dst [][]byte, in []io.Reader) error {
if len(in) != len(dst) {
2017-07-16 18:00:58 +03:00
panic("internal error: in and dst size do not match")
}
var wg sync.WaitGroup
wg.Add(len(in))
res := make(chan readResult, len(in))
for i := range in {
if in[i] == nil {
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
dst[i] = dst[i][:0]
wg.Done()
continue
}
go func(i int) {
defer wg.Done()
n, err := io.ReadFull(in[i], dst[i])
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
res <- readResult{size: n, err: err, n: i}
}(i)
}
wg.Wait()
close(res)
size := -1
for r := range res {
switch r.err {
case io.ErrUnexpectedEOF, io.EOF:
if size < 0 {
size = r.size
} else if r.size != size {
// Shard sizes must match.
return ErrShardSize
}
dst[r.n] = dst[r.n][0:r.size]
case nil:
default:
2015-10-27 13:11:50 +03:00
return StreamReadError{Err: r.err, Stream: r.n}
}
}
if size == 0 {
return io.EOF
}
return nil
}
// cWriteShards writes shards concurrently
func cWriteShards(out []io.Writer, in [][]byte) error {
if len(out) != len(in) {
2017-07-16 18:00:58 +03:00
panic("internal error: in and out size do not match")
}
var errs = make(chan error, len(out))
var wg sync.WaitGroup
wg.Add(len(out))
for i := range in {
go func(i int) {
defer wg.Done()
if out[i] == nil {
errs <- nil
return
}
n, err := out[i].Write(in[i])
if err != nil {
2015-10-27 13:11:50 +03:00
errs <- StreamWriteError{Err: err, Stream: i}
return
}
if n != len(in[i]) {
2015-10-27 13:11:50 +03:00
errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
}
}(i)
}
wg.Wait()
close(errs)
for err := range errs {
if err != nil {
return err
}
}
return nil
}
2015-10-27 17:59:47 +03:00
// Verify returns true if the parity shards contain correct data.
//
// The number of shards must match the number total data+parity shards
// given to NewStream().
//
// Each reader must supply the same number of bytes.
// If a shard stream returns an error, a StreamReadError type error
// will be returned.
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
func (r *rsStream) Verify(shards []io.Reader) (bool, error) {
2015-10-23 17:14:55 +03:00
if len(shards) != r.r.Shards {
return false, ErrTooFewShards
}
read := 0
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
all := r.createSlice()
defer r.blockPool.Put(all)
2015-10-23 17:14:55 +03:00
for {
err := r.readShards(all, shards)
2015-10-23 17:14:55 +03:00
if err == io.EOF {
if read == 0 {
return false, ErrShardNoData
}
2015-10-23 17:14:55 +03:00
return true, nil
}
if err != nil {
return false, err
}
read += shardSize(all)
2015-10-23 17:14:55 +03:00
ok, err := r.r.Verify(all)
if !ok || err != nil {
return ok, err
}
}
}
2015-11-03 14:36:28 +03:00
// ErrReconstructMismatch is returned by the StreamEncoder, if you supply
// "valid" and "fill" streams on the same index.
2015-10-23 17:14:55 +03:00
// Therefore it is impossible to see if you consider the shard valid
// or would like to have it reconstructed.
var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
2015-10-23 17:14:55 +03:00
2015-10-27 17:59:47 +03:00
// Reconstruct will recreate the missing shards if possible.
2015-10-23 17:14:55 +03:00
//
2015-10-27 17:59:47 +03:00
// Given a list of valid shards (to read) and invalid shards (to write)
2015-10-23 17:14:55 +03:00
//
2015-10-27 17:59:47 +03:00
// You indicate that a shard is missing by setting it to nil in the 'valid'
// slice and at the same time setting a non-nil writer in "fill".
// An index cannot contain both non-nil 'valid' and 'fill' entry.
2015-10-23 17:14:55 +03:00
//
// If there are too few shards to reconstruct the missing
// ones, ErrTooFewShards will be returned.
//
// The reconstructed shard set is complete when explicitly asked for all missing shards.
// However its integrity is not automatically verified.
// Use the Verify function to check in case the data set is complete.
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
func (r *rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
2015-10-23 17:14:55 +03:00
if len(valid) != r.r.Shards {
return ErrTooFewShards
}
if len(fill) != r.r.Shards {
return ErrTooFewShards
}
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
all := r.createSlice()
defer r.blockPool.Put(all)
reconDataOnly := true
2015-10-23 17:14:55 +03:00
for i := range valid {
if valid[i] != nil && fill[i] != nil {
return ErrReconstructMismatch
}
if i >= r.r.DataShards && fill[i] != nil {
reconDataOnly = false
}
2015-10-23 17:14:55 +03:00
}
read := 0
2015-10-23 17:14:55 +03:00
for {
err := r.readShards(all, valid)
2015-10-23 17:14:55 +03:00
if err == io.EOF {
if read == 0 {
return ErrShardNoData
}
2015-10-23 17:14:55 +03:00
return nil
}
if err != nil {
return err
}
read += shardSize(all)
all = trimShards(all, shardSize(all))
if reconDataOnly {
err = r.r.ReconstructData(all) // just reconstruct missing data shards
} else {
err = r.r.Reconstruct(all) // reconstruct all missing shards
}
2015-10-23 17:14:55 +03:00
if err != nil {
return err
}
err = r.writeShards(fill, all)
2015-10-23 17:14:55 +03:00
if err != nil {
return err
}
}
}
// Join the shards and write the data segment to dst.
//
// Only the data shards are considered.
2015-10-27 17:59:47 +03:00
//
2015-10-23 17:14:55 +03:00
// You must supply the exact output size you want.
// If there are to few shards given, ErrTooFewShards will be returned.
// If the total data size is less than outSize, ErrShortData will be returned.
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
func (r *rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
2015-10-23 17:14:55 +03:00
// Do we have enough shards?
if len(shards) < r.r.DataShards {
return ErrTooFewShards
}
// Trim off parity shards if any
shards = shards[:r.r.DataShards]
for i := range shards {
if shards[i] == nil {
2015-10-27 13:11:50 +03:00
return StreamReadError{Err: ErrShardNoData, Stream: i}
2015-10-23 17:14:55 +03:00
}
}
// Join all shards
src := io.MultiReader(shards...)
// Copy data to dst
n, err := io.CopyN(dst, src, outSize)
if err == io.EOF {
return ErrShortData
}
if err != nil {
return err
}
if n != outSize {
return ErrShortData
}
return nil
}
2015-10-27 17:59:47 +03:00
// Split a an input stream into the number of shards given to the encoder.
2015-10-23 17:14:55 +03:00
//
// The data will be split into equally sized shards.
2015-10-27 17:59:47 +03:00
// If the data size isn't dividable by the number of shards,
2015-10-23 17:14:55 +03:00
// the last shard will contain extra zeros.
//
2015-10-27 17:59:47 +03:00
// You must supply the total size of your input.
// 'ErrShortData' will be returned if it is unable to retrieve the
// number of bytes indicated.
Fix stream allocations (#129) Numbers speak for themselves: ``` benchmark old ns/op new ns/op delta BenchmarkStreamEncode10x2x10000-32 4792420 7937 -99.83% BenchmarkStreamEncode100x20x10000-32 38424066 473285 -98.77% BenchmarkStreamEncode17x3x1M-32 8195036 1482191 -81.91% BenchmarkStreamEncode10x4x16M-32 21356715 18051773 -15.47% BenchmarkStreamEncode5x2x1M-32 3295827 412301 -87.49% BenchmarkStreamEncode10x2x1M-32 5249011 798828 -84.78% BenchmarkStreamEncode10x4x1M-32 6392974 904818 -85.85% BenchmarkStreamEncode50x20x1M-32 29083474 7199282 -75.25% BenchmarkStreamEncode17x3x16M-32 32451850 28036421 -13.61% BenchmarkStreamVerify10x2x10000-32 4858416 12988 -99.73% BenchmarkStreamVerify50x5x50000-32 17047361 377003 -97.79% BenchmarkStreamVerify10x2x1M-32 4869964 887214 -81.78% BenchmarkStreamVerify5x2x1M-32 3282999 591669 -81.98% BenchmarkStreamVerify10x4x1M-32 5824392 1230888 -78.87% BenchmarkStreamVerify50x20x1M-32 27301648 6204613 -77.27% BenchmarkStreamVerify10x4x16M-32 8508963 18845695 +121.48% benchmark old MB/s new MB/s speedup BenchmarkStreamEncode10x2x10000-32 20.87 12599.82 603.73x BenchmarkStreamEncode100x20x10000-32 26.03 2112.89 81.17x BenchmarkStreamEncode17x3x1M-32 2175.19 12026.65 5.53x BenchmarkStreamEncode10x4x16M-32 7855.71 9293.94 1.18x BenchmarkStreamEncode5x2x1M-32 1590.76 12716.14 7.99x BenchmarkStreamEncode10x2x1M-32 1997.66 13126.43 6.57x BenchmarkStreamEncode10x4x1M-32 1640.20 11588.81 7.07x BenchmarkStreamEncode50x20x1M-32 1802.70 7282.50 4.04x BenchmarkStreamEncode17x3x16M-32 8788.80 10172.93 1.16x BenchmarkStreamVerify10x2x10000-32 20.58 7699.20 374.11x BenchmarkStreamVerify50x5x50000-32 293.30 13262.49 45.22x BenchmarkStreamVerify10x2x1M-32 2153.15 11818.75 5.49x BenchmarkStreamVerify5x2x1M-32 1596.98 8861.17 5.55x BenchmarkStreamVerify10x4x1M-32 1800.32 8518.86 4.73x BenchmarkStreamVerify50x20x1M-32 1920.35 8449.97 4.40x BenchmarkStreamVerify10x4x16M-32 19717.11 8902.41 0.45x ```
2020-05-05 17:35:35 +03:00
func (r *rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
if size == 0 {
2015-10-23 17:14:55 +03:00
return ErrShortData
}
if len(dst) != r.r.DataShards {
return ErrInvShardNum
}
for i := range dst {
if dst[i] == nil {
2015-10-27 13:11:50 +03:00
return StreamWriteError{Err: ErrShardNoData, Stream: i}
2015-10-23 17:14:55 +03:00
}
}
// Calculate number of bytes per shard.
perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
// Pad data to r.Shards*perShard.
padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
data = io.MultiReader(data, bytes.NewBuffer(padding))
// Split into equal-length shards and copy.
for i := range dst {
n, err := io.CopyN(dst[i], data, perShard)
if err != io.EOF && err != nil {
2015-10-27 13:11:50 +03:00
return err
2015-10-23 17:14:55 +03:00
}
if n != perShard {
return ErrShortData
}
}
return nil
}