reedsolomon-go/streaming.go

531 lines
14 KiB
Go
Raw Normal View History

2015-10-23 17:14:55 +03:00
/**
* Reed-Solomon Coding over 8-bit values.
*
* Copyright 2015, Klaus Post
* Copyright 2015, Backblaze, Inc.
*/
// Reed-Solomon Erasure Coding in Go
//
// For usage and examples, see https://github.com/klauspost/reedsolomon
//
package reedsolomon
import (
"bytes"
"errors"
"fmt"
2015-10-23 17:14:55 +03:00
"io"
"sync"
2015-10-23 17:14:55 +03:00
)
// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
// It provides a fully streaming interface, and processes data in blocks of up to 4MB.
type StreamEncoder interface {
// Encodes parity shards for a set of data shards.
// Input is 'shards' containing readers for data shards followed by parity shards
// io.Writer.
// The number of shards must match the number given to New().
// Each shard is a byte array, and they must all be the same size.
// The parity shards will always be overwritten and the data shards
// will remain the same, so it is safe for you to read from the
// data shards while this is running.
Encode(data []io.Reader, parity []io.Writer) error
// Verify returns true if the parity shards contain correct data.
// The data is the same format as Encode. No data is modified, so
// you are allowed to read from data while this is running.
Verify(shards []io.Reader) (bool, error)
// Reconstruct will recreate the missing shards if possible.
//
// Given a list of valid shards (to read) and invalid shards (to write)
//
// The length of each slice must be equal to the total number of shards.
// You indicate that a shard is missing by setting it to nil.
//
// You indicate that you would like to have a shard filled by setting
// a non-nil writer in "fill". An index cannot contain both non-nil
// 'valid' and 'fill' entry.
//
// If there are too few shards to reconstruct the missing
// ones, ErrTooFewShards will be returned.
//
// The reconstructed shard set is complete, but integrity is not verified.
// Use the Verify function to check if data set is ok.
Reconstruct(valid []io.Reader, fill []io.Writer) error
// Split a data slice into the number of shards given to the encoder.
//
// You must supply the total size of your input.
//
// The data will be split into equally sized shards.
// If the data size isn't dividable by the number of shards,
// the last shard will contain extra zeros.
//
// There must be at least the same number of bytes as there are data shards,
// otherwise ErrShortData will be returned.
Split(data io.Reader, dst []io.Writer, size int64) (err error)
// Join the shards and write the data segment to dst.
//
// Only the data shards are considered.
//
// You must supply the exact output size you want.
// If there are to few shards given, ErrTooFewShards will be returned.
// If the total data size is less than outSize, ErrShortData will be returned.
Join(dst io.Writer, shards []io.Reader, outSize int64) error
}
// StreamReadError is returned when a read error is encountered
// that relates to a supplied stream. This will allow you to
// find out which reader/writer has failed.
type StreamError struct {
Err error // The error
Stream int // The stream number on which the error occurred
Op string // Will be "read" or "write".
}
// Error returns the error as a string
func (s StreamError) Error() string {
return fmt.Sprintf("error on %s stream #%d: %s", s.Op, s.Stream, s.Err)
}
// String returns the error as a string
func (s StreamError) String() string {
return s.Error()
}
2015-10-23 17:14:55 +03:00
// reedSolomon contains a matrix for a specific
// distribution of datashards and parity shards.
// Construct if using New()
type rsStream struct {
r *reedSolomon
bs int // Block size
// Shard reader
readShards func(dst [][]byte, in []io.Reader) error
// Shard writer
writeShards func(out []io.Writer, in [][]byte) error
creads bool
cwrites bool
2015-10-23 17:14:55 +03:00
}
// NewStream creates a new encoder and initializes it to
2015-10-23 17:14:55 +03:00
// the number of data shards and parity shards that
// you want to use. You can reuse this encoder.
// Note that the maximum number of data shards is 256.
func NewStream(dataShards, parityShards int) (StreamEncoder, error) {
enc, err := New(dataShards, parityShards)
if err != nil {
return nil, err
}
rs := enc.(*reedSolomon)
r := rsStream{r: rs, bs: 4 << 20}
r.readShards = readShards
r.writeShards = writeShards
return &r, err
}
// NewStreamC creates a new encoder and initializes it to
// the number of data shards and parity shards that
// you want to use. You can reuse this encoder.
// Note that the maximum number of data shards is 256.
// This allows you to enable concurrent reads and writes
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) {
enc, err := New(dataShards, parityShards)
if err != nil {
return nil, err
}
rs := enc.(*reedSolomon)
r := rsStream{r: rs, bs: 4 << 20}
r.readShards = readShards
r.writeShards = writeShards
if conReads {
r.readShards = cReadShards
}
if conWrites {
r.writeShards = cWriteShards
}
2015-10-23 17:14:55 +03:00
return &r, err
}
func createSlice(n, length int) [][]byte {
out := make([][]byte, n)
for i := range out {
out[i] = make([]byte, length)
}
return out
}
// Encodes parity for a set of data shards.
// An array 'shards' containing data shards followed by parity shards.
// The number of shards must match the number given to New.
// Each shard is a byte array, and they must all be the same size.
// The parity shards will always be overwritten and the data shards
// will remain the same.
func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
if len(data) != r.r.DataShards {
return ErrTooFewShards
}
if len(parity) != r.r.ParityShards {
return ErrTooFewShards
}
all := createSlice(r.r.Shards, r.bs)
in := all[:r.r.DataShards]
out := all[r.r.DataShards:]
read := 0
2015-10-23 17:14:55 +03:00
for {
err := r.readShards(in, data)
2015-10-23 17:14:55 +03:00
switch err {
case nil:
case io.EOF:
if read == 0 {
return ErrShardNoData
}
2015-10-23 17:14:55 +03:00
return nil
default:
return err
}
out = trimShards(out, shardSize(in))
read += shardSize(in)
2015-10-23 17:14:55 +03:00
err = r.r.Encode(all)
if err != nil {
return err
}
err = r.writeShards(parity, out)
2015-10-23 17:14:55 +03:00
if err != nil {
return err
}
}
}
// Trim the shards so they are all the same size
func trimShards(in [][]byte, size int) [][]byte {
for i := range in {
if in[i] != nil {
in[i] = in[i][0:size]
}
if len(in[i]) < size {
in[i] = nil
}
}
return in
}
2015-10-23 17:14:55 +03:00
func readShards(dst [][]byte, in []io.Reader) error {
if len(in) != len(dst) {
panic("internal error: in and dst size does not match")
}
size := -1
for i := range in {
if in[i] == nil {
dst[i] = nil
continue
}
n, err := io.ReadFull(in[i], dst[i])
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
switch err {
case io.ErrUnexpectedEOF, io.EOF:
if size < 0 {
size = n
} else if n != size {
// Shard sizes must match.
return StreamError{Err: ErrShardSize, Op: "read", Stream: i}
2015-10-23 17:14:55 +03:00
}
dst[i] = dst[i][0:n]
case nil:
continue
2015-10-23 17:14:55 +03:00
default:
return StreamError{Err: err, Op: "read", Stream: i}
2015-10-23 17:14:55 +03:00
}
}
if size == 0 {
return io.EOF
}
return nil
}
func writeShards(out []io.Writer, in [][]byte) error {
if len(out) != len(in) {
panic("internal error: in and out size does not match")
}
for i := range in {
if out[i] == nil {
continue
}
n, err := out[i].Write(in[i])
if err != nil {
return StreamError{Err: err, Op: "write", Stream: i}
2015-10-23 17:14:55 +03:00
}
//
if n != len(in[i]) {
return StreamError{Err: io.ErrShortWrite, Op: "write", Stream: i}
2015-10-23 17:14:55 +03:00
}
}
return nil
}
type readResult struct {
n int
size int
err error
}
// cReadShards reads shards concurrently
func cReadShards(dst [][]byte, in []io.Reader) error {
if len(in) != len(dst) {
panic("internal error: in and dst size does not match")
}
var wg sync.WaitGroup
wg.Add(len(in))
res := make(chan readResult, len(in))
for i := range in {
if in[i] == nil {
dst[i] = nil
wg.Done()
continue
}
go func(i int) {
defer wg.Done()
n, err := io.ReadFull(in[i], dst[i])
// The error is EOF only if no bytes were read.
// If an EOF happens after reading some but not all the bytes,
// ReadFull returns ErrUnexpectedEOF.
res <- readResult{size: n, err: err, n: i}
}(i)
}
wg.Wait()
close(res)
size := -1
for r := range res {
switch r.err {
case io.ErrUnexpectedEOF, io.EOF:
if size < 0 {
size = r.size
} else if r.size != size {
// Shard sizes must match.
return StreamError{Err: ErrShardSize, Op: "read", Stream: r.n}
}
dst[r.n] = dst[r.n][0:r.size]
case nil:
default:
return StreamError{Err: r.err, Op: "read", Stream: r.n}
}
}
if size == 0 {
return io.EOF
}
return nil
}
// cWriteShards writes shards concurrently
func cWriteShards(out []io.Writer, in [][]byte) error {
if len(out) != len(in) {
panic("internal error: in and out size does not match")
}
var errs = make(chan error, len(out))
var wg sync.WaitGroup
wg.Add(len(out))
for i := range in {
go func(i int) {
defer wg.Done()
if out[i] == nil {
errs <- nil
return
}
n, err := out[i].Write(in[i])
if err != nil {
errs <- StreamError{Err: err, Op: "write", Stream: i}
return
}
if n != len(in[i]) {
errs <- StreamError{Err: io.ErrShortWrite, Op: "write", Stream: i}
}
}(i)
}
wg.Wait()
close(errs)
for err := range errs {
if err != nil {
return err
}
}
return nil
}
2015-10-23 17:14:55 +03:00
// Verify returns true if the parity shards contain the right data.
// The data is the same format as Encode. No data is modified.
func (r rsStream) Verify(shards []io.Reader) (bool, error) {
if len(shards) != r.r.Shards {
return false, ErrTooFewShards
}
read := 0
2015-10-23 17:14:55 +03:00
all := createSlice(r.r.Shards, r.bs)
for {
err := r.readShards(all, shards)
2015-10-23 17:14:55 +03:00
if err == io.EOF {
if read == 0 {
return false, ErrShardNoData
}
2015-10-23 17:14:55 +03:00
return true, nil
}
if err != nil {
return false, err
}
read += shardSize(all)
2015-10-23 17:14:55 +03:00
ok, err := r.r.Verify(all)
if !ok || err != nil {
return ok, err
}
}
}
// This error is returned by the StreamEncoder, if you supply "valid" and "fill" streams
// on the same index.
// Therefore it is impossible to see if you consider the shard valid
// or would like to have it reconstructed.
var ErrReconstructMismatch = errors.New("valid shards and fill shards are mutually exclusive")
2015-10-23 17:14:55 +03:00
// Reconstruct will recreate the missing shards, if possible.
//
// Given a list of shards, some of which contain data, fills in the
// ones that don't have data.
//
// The length of the array must be equal to Shards.
// You indicate that a shard is missing by setting it to nil.
//
// If there are too few shards to reconstruct the missing
// ones, ErrTooFewShards will be returned.
//
// The reconstructed shard set is complete, but integrity is not verified.
// Use the Verify function to check if data set is ok.
func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
if len(valid) != r.r.Shards {
return ErrTooFewShards
}
if len(fill) != r.r.Shards {
return ErrTooFewShards
}
all := createSlice(r.r.Shards, r.bs)
for i := range valid {
if valid[i] != nil && fill[i] != nil {
return ErrReconstructMismatch
}
}
read := 0
2015-10-23 17:14:55 +03:00
for {
err := r.readShards(all, valid)
2015-10-23 17:14:55 +03:00
if err == io.EOF {
if read == 0 {
return ErrShardNoData
}
2015-10-23 17:14:55 +03:00
return nil
}
if err != nil {
return err
}
read += shardSize(all)
all = trimShards(all, shardSize(all))
2015-10-23 17:14:55 +03:00
err = r.r.Reconstruct(all)
if err != nil {
return err
}
err = r.writeShards(fill, all)
2015-10-23 17:14:55 +03:00
if err != nil {
return err
}
}
}
// Join the shards and write the data segment to dst.
//
// Only the data shards are considered.
// You must supply the exact output size you want.
// If there are to few shards given, ErrTooFewShards will be returned.
// If the total data size is less than outSize, ErrShortData will be returned.
func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
// Do we have enough shards?
if len(shards) < r.r.DataShards {
return ErrTooFewShards
}
// Trim off parity shards if any
shards = shards[:r.r.DataShards]
for i := range shards {
if shards[i] == nil {
return StreamError{Err: ErrShardNoData, Op: "read", Stream: i}
2015-10-23 17:14:55 +03:00
}
}
// Join all shards
src := io.MultiReader(shards...)
// Copy data to dst
n, err := io.CopyN(dst, src, outSize)
if err == io.EOF {
return ErrShortData
}
if err != nil {
return err
}
if n != outSize {
return ErrShortData
}
return nil
}
// Split a data slice into the number of shards given to the encoder,
// and create empty parity shards.
//
// The data will be split into equally sized shards.
// If the data size isn't divisible by the number of shards,
// the last shard will contain extra zeros.
//
// There must be at least the same number of bytes as there are data shards,
// otherwise ErrShortData will be returned.
func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
if size < int64(r.r.DataShards) {
return ErrShortData
}
if len(dst) != r.r.DataShards {
return ErrInvShardNum
}
for i := range dst {
if dst[i] == nil {
return StreamError{Err: ErrShardNoData, Op: "write", Stream: i}
2015-10-23 17:14:55 +03:00
}
}
// Calculate number of bytes per shard.
perShard := (size + int64(r.r.DataShards) - 1) / int64(r.r.DataShards)
// Pad data to r.Shards*perShard.
padding := make([]byte, (int64(r.r.Shards)*perShard)-size)
data = io.MultiReader(data, bytes.NewBuffer(padding))
// Split into equal-length shards and copy.
for i := range dst {
n, err := io.CopyN(dst[i], data, perShard)
if err != io.EOF && err != nil {
return StreamError{Err: err, Op: "write", Stream: i}
2015-10-23 17:14:55 +03:00
}
if n != perShard {
return ErrShortData
}
}
return nil
}