Add ReconstructData interface method (#57)
* Add ReconstructData interface method to allow reconstruction of any missing data shards * Add support for just reconstructing data shards only to SteamEncoder.Reconstruct()master
parent
0dd0a0e50c
commit
0de37d7697
23
README.md
23
README.md
|
@ -81,6 +81,17 @@ To indicate missing data, you set the shard to nil before calling `Reconstruct()
|
|||
```
|
||||
The missing data and parity shards will be recreated. If more than 3 shards are missing, the reconstruction will fail.
|
||||
|
||||
If you are only interested in the data shards (for reading purposes) you can call `ReconstructData()`:
|
||||
|
||||
```Go
|
||||
// Delete two data shards
|
||||
data[3] = nil
|
||||
data[7] = nil
|
||||
|
||||
// Reconstruct just the missing data shards
|
||||
err := enc.ReconstructData(data)
|
||||
```
|
||||
|
||||
So to sum up reconstruction:
|
||||
* The number of data/parity shards must match the numbers used for encoding.
|
||||
* The order of shards must be the same as used when encoding.
|
||||
|
@ -198,6 +209,18 @@ Example of performance scaling on Intel(R) Core(TM) i7-2600 CPU @ 3.40GHz - 4 ph
|
|||
| 4 | 3179,33 | 235% |
|
||||
| 8 | 4346,18 | 321% |
|
||||
|
||||
Benchmarking `Reconstruct()` followed by a `Verify()` (=`all`) versus just calling `ReconstructData()` (=`data`) gives the following result:
|
||||
```
|
||||
benchmark all MB/s data MB/s speedup
|
||||
BenchmarkReconstruct10x2x10000-8 2011.67 10530.10 5.23x
|
||||
BenchmarkReconstruct50x5x50000-8 4585.41 14301.60 3.12x
|
||||
BenchmarkReconstruct10x2x1M-8 8081.15 28216.41 3.49x
|
||||
BenchmarkReconstruct5x2x1M-8 5780.07 28015.37 4.85x
|
||||
BenchmarkReconstruct10x4x1M-8 4352.56 14367.61 3.30x
|
||||
BenchmarkReconstruct50x20x1M-8 1364.35 4189.79 3.07x
|
||||
BenchmarkReconstruct10x4x16M-8 1484.35 5779.53 3.89x
|
||||
```
|
||||
|
||||
# asm2plan9s
|
||||
|
||||
[asm2plan9s](https://github.com/fwessels/asm2plan9s) is used for assembling the AVX2 instructions into their BYTE/WORD/LONG equivalents.
|
||||
|
|
|
@ -49,6 +49,21 @@ type Encoder interface {
|
|||
// Use the Verify function to check if data set is ok.
|
||||
Reconstruct(shards [][]byte) error
|
||||
|
||||
// ReconstructData will recreate any missing data shards, if possible.
|
||||
//
|
||||
// Given a list of shards, some of which contain data, fills in the
|
||||
// data shards that don't have data.
|
||||
//
|
||||
// The length of the array must be equal to Shards.
|
||||
// You indicate that a shard is missing by setting it to nil.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// As the reconstructed shard set may contain missing parity shards,
|
||||
// calling the Verify function is likely to fail.
|
||||
ReconstructData(shards [][]byte) error
|
||||
|
||||
// Split a data slice into the number of shards given to the encoder,
|
||||
// and create empty parity shards.
|
||||
//
|
||||
|
@ -437,6 +452,35 @@ func shardSize(shards [][]byte) int {
|
|||
// The reconstructed shard set is complete, but integrity is not verified.
|
||||
// Use the Verify function to check if data set is ok.
|
||||
func (r reedSolomon) Reconstruct(shards [][]byte) error {
|
||||
return r.reconstruct(shards, false)
|
||||
}
|
||||
|
||||
// ReconstructData will recreate any missing data shards, if possible.
|
||||
//
|
||||
// Given a list of shards, some of which contain data, fills in the
|
||||
// data shards that don't have data.
|
||||
//
|
||||
// The length of the array must be equal to Shards.
|
||||
// You indicate that a shard is missing by setting it to nil.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// As the reconstructed shard set may contain missing parity shards,
|
||||
// calling the Verify function is likely to fail.
|
||||
func (r reedSolomon) ReconstructData(shards [][]byte) error {
|
||||
return r.reconstruct(shards, true)
|
||||
}
|
||||
|
||||
// reconstruct will recreate the missing data shards, and unless
|
||||
// dataOnly is true, also the missing parity shards
|
||||
//
|
||||
// The length of the array must be equal to Shards.
|
||||
// You indicate that a shard is missing by setting it to nil.
|
||||
//
|
||||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
func (r reedSolomon) reconstruct(shards [][]byte, dataOnly bool) error {
|
||||
if len(shards) != r.Shards {
|
||||
return ErrTooFewShards
|
||||
}
|
||||
|
@ -543,6 +587,11 @@ func (r reedSolomon) Reconstruct(shards [][]byte) error {
|
|||
}
|
||||
r.codeSomeShards(matrixRows, subShards, outputs[:outputCount], outputCount, shardSize)
|
||||
|
||||
if dataOnly {
|
||||
// Exit out early if we are only interested in the data shards
|
||||
return nil
|
||||
}
|
||||
|
||||
// Now that we have all of the data shards intact, we can
|
||||
// compute any of the parity that is missing.
|
||||
//
|
||||
|
|
|
@ -250,6 +250,115 @@ func testReconstruct(t *testing.T, o ...Option) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestReconstructData(t *testing.T) {
|
||||
testReconstructData(t)
|
||||
for _, o := range testOpts() {
|
||||
testReconstruct(t, o...)
|
||||
}
|
||||
}
|
||||
|
||||
func testReconstructData(t *testing.T, o ...Option) {
|
||||
perShard := 100000
|
||||
r, err := New(8, 5, o...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
shards := make([][]byte, 13)
|
||||
for s := range shards {
|
||||
shards[s] = make([]byte, perShard)
|
||||
}
|
||||
|
||||
rand.Seed(0)
|
||||
for s := 0; s < 13; s++ {
|
||||
fillRandom(shards[s])
|
||||
}
|
||||
|
||||
err = r.Encode(shards)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Reconstruct with all shards present
|
||||
err = r.ReconstructData(shards)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Reconstruct with 10 shards present
|
||||
shards[0] = nil
|
||||
shards[2] = nil
|
||||
shards[4] = nil
|
||||
|
||||
err = r.ReconstructData(shards)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Since all parity shards are available, verification will succeed
|
||||
ok, err := r.Verify(shards)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("Verification failed")
|
||||
}
|
||||
|
||||
// Reconstruct with 6 data and 4 parity shards
|
||||
shards[0] = nil
|
||||
shards[2] = nil
|
||||
shards[12] = nil
|
||||
|
||||
err = r.ReconstructData(shards)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verification will fail now due to absence of a parity block
|
||||
_, err = r.Verify(shards)
|
||||
if err != ErrShardSize {
|
||||
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
|
||||
}
|
||||
|
||||
// Reconstruct with 7 data and 1 parity shards
|
||||
shards[0] = nil
|
||||
shards[9] = nil
|
||||
shards[10] = nil
|
||||
shards[11] = nil
|
||||
shards[12] = nil
|
||||
|
||||
err = r.ReconstructData(shards)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = r.Verify(shards)
|
||||
if err != ErrShardSize {
|
||||
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
|
||||
}
|
||||
|
||||
// Reconstruct with 6 data and 1 parity shards (should fail)
|
||||
shards[0] = nil
|
||||
shards[1] = nil
|
||||
shards[9] = nil
|
||||
shards[10] = nil
|
||||
shards[11] = nil
|
||||
shards[12] = nil
|
||||
|
||||
err = r.ReconstructData(shards)
|
||||
if err != ErrTooFewShards {
|
||||
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
|
||||
}
|
||||
|
||||
err = r.ReconstructData(make([][]byte, 1))
|
||||
if err != ErrTooFewShards {
|
||||
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
|
||||
}
|
||||
err = r.ReconstructData(make([][]byte, 13))
|
||||
if err != ErrShardNoData {
|
||||
t.Errorf("expected %v, got %v", ErrShardNoData, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReconstructPAR1Singular(t *testing.T) {
|
||||
perShard := 50
|
||||
r, err := New(4, 4, WithPAR1Matrix())
|
||||
|
@ -630,6 +739,79 @@ func BenchmarkReconstruct10x4x16M(b *testing.B) {
|
|||
benchmarkReconstruct(b, 10, 4, 16*1024*1024)
|
||||
}
|
||||
|
||||
func corruptRandomData(shards [][]byte, dataShards, parityShards int) {
|
||||
shardsToCorrupt := rand.Intn(parityShards)
|
||||
for i := 1; i <= shardsToCorrupt; i++ {
|
||||
shards[rand.Intn(dataShards)] = nil
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkReconstructData(b *testing.B, dataShards, parityShards, shardSize int) {
|
||||
r, err := New(dataShards, parityShards)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
shards := make([][]byte, parityShards+dataShards)
|
||||
for s := range shards {
|
||||
shards[s] = make([]byte, shardSize)
|
||||
}
|
||||
|
||||
rand.Seed(0)
|
||||
for s := 0; s < dataShards; s++ {
|
||||
fillRandom(shards[s])
|
||||
}
|
||||
err = r.Encode(shards)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
b.SetBytes(int64(shardSize * dataShards))
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
corruptRandomData(shards, dataShards, parityShards)
|
||||
|
||||
err = r.ReconstructData(shards)
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Benchmark 10 data slices with 2 parity slices holding 10000 bytes each
|
||||
func BenchmarkReconstructData10x2x10000(b *testing.B) {
|
||||
benchmarkReconstructData(b, 10, 2, 10000)
|
||||
}
|
||||
|
||||
// Benchmark 50 data slices with 5 parity slices holding 100000 bytes each
|
||||
func BenchmarkReconstructData50x5x50000(b *testing.B) {
|
||||
benchmarkReconstructData(b, 50, 5, 100000)
|
||||
}
|
||||
|
||||
// Benchmark 10 data slices with 2 parity slices holding 1MB bytes each
|
||||
func BenchmarkReconstructData10x2x1M(b *testing.B) {
|
||||
benchmarkReconstructData(b, 10, 2, 1024*1024)
|
||||
}
|
||||
|
||||
// Benchmark 5 data slices with 2 parity slices holding 1MB bytes each
|
||||
func BenchmarkReconstructData5x2x1M(b *testing.B) {
|
||||
benchmarkReconstructData(b, 5, 2, 1024*1024)
|
||||
}
|
||||
|
||||
// Benchmark 10 data slices with 4 parity slices holding 1MB bytes each
|
||||
func BenchmarkReconstructData10x4x1M(b *testing.B) {
|
||||
benchmarkReconstructData(b, 10, 4, 1024*1024)
|
||||
}
|
||||
|
||||
// Benchmark 5 data slices with 2 parity slices holding 1MB bytes each
|
||||
func BenchmarkReconstructData50x20x1M(b *testing.B) {
|
||||
benchmarkReconstructData(b, 50, 20, 1024*1024)
|
||||
}
|
||||
|
||||
// Benchmark 10 data slices with 4 parity slices holding 16MB bytes each
|
||||
func BenchmarkReconstructData10x4x16M(b *testing.B) {
|
||||
benchmarkReconstructData(b, 10, 4, 16*1024*1024)
|
||||
}
|
||||
|
||||
func benchmarkReconstructP(b *testing.B, dataShards, parityShards, shardSize int) {
|
||||
r, err := New(dataShards, parityShards)
|
||||
if err != nil {
|
||||
|
|
15
streaming.go
15
streaming.go
|
@ -450,8 +450,9 @@ var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutual
|
|||
// If there are too few shards to reconstruct the missing
|
||||
// ones, ErrTooFewShards will be returned.
|
||||
//
|
||||
// The reconstructed shard set is complete, but integrity is not verified.
|
||||
// Use the Verify function to check if data set is ok.
|
||||
// The reconstructed shard set is complete when explicitly asked for all missing shards.
|
||||
// However its integrity is not automatically verified.
|
||||
// Use the Verify function to check in case the data set is complete.
|
||||
func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
||||
if len(valid) != r.r.Shards {
|
||||
return ErrTooFewShards
|
||||
|
@ -461,10 +462,14 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
|||
}
|
||||
|
||||
all := createSlice(r.r.Shards, r.bs)
|
||||
reconDataOnly := true
|
||||
for i := range valid {
|
||||
if valid[i] != nil && fill[i] != nil {
|
||||
return ErrReconstructMismatch
|
||||
}
|
||||
if i >= r.r.DataShards && fill[i] != nil {
|
||||
reconDataOnly = false
|
||||
}
|
||||
}
|
||||
|
||||
read := 0
|
||||
|
@ -482,7 +487,11 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
|||
read += shardSize(all)
|
||||
all = trimShards(all, shardSize(all))
|
||||
|
||||
err = r.r.Reconstruct(all)
|
||||
if reconDataOnly {
|
||||
err = r.r.ReconstructData(all) // just reconstruct missing data shards
|
||||
} else {
|
||||
err = r.r.Reconstruct(all) // reconstruct all missing shards
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -211,7 +211,7 @@ func TestStreamReconstruct(t *testing.T) {
|
|||
|
||||
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
|
||||
|
||||
// Reconstruct with 10 shards present
|
||||
// Reconstruct with 10 shards present, asking for all shards to be reconstructed
|
||||
all[0] = nil
|
||||
fill[0] = emptyBuffers(1)[0]
|
||||
all[7] = nil
|
||||
|
@ -240,6 +240,25 @@ func TestStreamReconstruct(t *testing.T) {
|
|||
|
||||
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
|
||||
|
||||
// Reconstruct with 10 shards present, asking for just data shards to be reconstructed
|
||||
all[0] = nil
|
||||
fill[0] = emptyBuffers(1)[0]
|
||||
all[7] = nil
|
||||
fill[7] = emptyBuffers(1)[0]
|
||||
all[11] = nil
|
||||
fill[11] = nil
|
||||
|
||||
err = r.Reconstruct(all, fill)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if fill[11] != nil {
|
||||
t.Fatal("Unexpected parity block reconstructed")
|
||||
}
|
||||
|
||||
all = append(toReaders(toBuffers(shards)), toReaders(toBuffers(parity))...)
|
||||
|
||||
// Reconstruct with 9 shards present (should fail)
|
||||
all[0] = nil
|
||||
fill[0] = emptyBuffers(1)[0]
|
||||
|
|
Loading…
Reference in New Issue