Add concurrent reads/writes to streaming interface.
parent
4bc47caf71
commit
0500314cc5
135
streaming.go
135
streaming.go
|
@ -15,6 +15,7 @@ import (
|
|||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// StreamEncoder is an interface to encode Reed-Salomon parity sets for your data.
|
||||
|
@ -81,9 +82,15 @@ type StreamEncoder interface {
|
|||
type rsStream struct {
|
||||
r *reedSolomon
|
||||
bs int // Block size
|
||||
// Shard reader
|
||||
readShards func(dst [][]byte, in []io.Reader) error
|
||||
// Shard writer
|
||||
writeShards func(out []io.Writer, in [][]byte) error
|
||||
creads bool
|
||||
cwrites bool
|
||||
}
|
||||
|
||||
// New creates a new encoder and initializes it to
|
||||
// NewStream creates a new encoder and initializes it to
|
||||
// the number of data shards and parity shards that
|
||||
// you want to use. You can reuse this encoder.
|
||||
// Note that the maximum number of data shards is 256.
|
||||
|
@ -94,6 +101,31 @@ func NewStream(dataShards, parityShards int) (StreamEncoder, error) {
|
|||
}
|
||||
rs := enc.(*reedSolomon)
|
||||
r := rsStream{r: rs, bs: 4 << 20}
|
||||
r.readShards = readShards
|
||||
r.writeShards = writeShards
|
||||
return &r, err
|
||||
}
|
||||
|
||||
// NewStreamC creates a new encoder and initializes it to
|
||||
// the number of data shards and parity shards that
|
||||
// you want to use. You can reuse this encoder.
|
||||
// Note that the maximum number of data shards is 256.
|
||||
// This allows you to enable concurrent reads and writes
|
||||
func NewStreamC(dataShards, parityShards int, conReads, conWrites bool) (StreamEncoder, error) {
|
||||
enc, err := New(dataShards, parityShards)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rs := enc.(*reedSolomon)
|
||||
r := rsStream{r: rs, bs: 4 << 20}
|
||||
r.readShards = readShards
|
||||
r.writeShards = writeShards
|
||||
if conReads {
|
||||
r.readShards = cReadShards
|
||||
}
|
||||
if conWrites {
|
||||
r.writeShards = cWriteShards
|
||||
}
|
||||
return &r, err
|
||||
}
|
||||
|
||||
|
@ -126,7 +158,7 @@ func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
|
|||
read := 0
|
||||
|
||||
for {
|
||||
err := readShards(in, data)
|
||||
err := r.readShards(in, data)
|
||||
switch err {
|
||||
case nil:
|
||||
case io.EOF:
|
||||
|
@ -143,7 +175,7 @@ func (r rsStream) Encode(data []io.Reader, parity []io.Writer) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = writeShards(parity, out)
|
||||
err = r.writeShards(parity, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -218,6 +250,97 @@ func writeShards(out []io.Writer, in [][]byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type readResult struct {
|
||||
size int
|
||||
err error
|
||||
}
|
||||
|
||||
// cReadShards reads shards concurrently
|
||||
func cReadShards(dst [][]byte, in []io.Reader) error {
|
||||
if len(in) != len(dst) {
|
||||
panic("internal error: in and dst size does not match")
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(in))
|
||||
res := make(chan readResult, len(in))
|
||||
for i := range in {
|
||||
if in[i] == nil {
|
||||
dst[i] = nil
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
n, err := io.ReadFull(in[i], dst[i])
|
||||
// The error is EOF only if no bytes were read.
|
||||
// If an EOF happens after reading some but not all the bytes,
|
||||
// ReadFull returns ErrUnexpectedEOF.
|
||||
res <- readResult{size: n, err: err}
|
||||
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(res)
|
||||
size := -1
|
||||
i := 0
|
||||
for r := range res {
|
||||
switch r.err {
|
||||
case io.ErrUnexpectedEOF, io.EOF:
|
||||
if size < 0 {
|
||||
size = r.size
|
||||
} else if r.size != size {
|
||||
// Shard sizes must match.
|
||||
return ErrShardSize
|
||||
}
|
||||
dst[i] = dst[i][0:r.size]
|
||||
case nil:
|
||||
default:
|
||||
return r.err
|
||||
}
|
||||
i++
|
||||
}
|
||||
if size == 0 {
|
||||
return io.EOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// cWriteShards writes shards concurrently
|
||||
func cWriteShards(out []io.Writer, in [][]byte) error {
|
||||
if len(out) != len(in) {
|
||||
panic("internal error: in and out size does not match")
|
||||
}
|
||||
var errs = make(chan error, len(out))
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(out))
|
||||
for i := range in {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
if out[i] == nil {
|
||||
errs <- nil
|
||||
return
|
||||
}
|
||||
n, err := out[i].Write(in[i])
|
||||
if err != nil {
|
||||
errs <- err
|
||||
return
|
||||
}
|
||||
if n != len(in[i]) {
|
||||
errs <- io.ErrShortWrite
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
close(errs)
|
||||
for err := range errs {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Verify returns true if the parity shards contain the right data.
|
||||
// The data is the same format as Encode. No data is modified.
|
||||
func (r rsStream) Verify(shards []io.Reader) (bool, error) {
|
||||
|
@ -228,7 +351,7 @@ func (r rsStream) Verify(shards []io.Reader) (bool, error) {
|
|||
read := 0
|
||||
all := createSlice(r.r.Shards, r.bs)
|
||||
for {
|
||||
err := readShards(all, shards)
|
||||
err := r.readShards(all, shards)
|
||||
if err == io.EOF {
|
||||
if read == 0 {
|
||||
return false, ErrShardNoData
|
||||
|
@ -282,7 +405,7 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
|||
|
||||
read := 0
|
||||
for {
|
||||
err := readShards(all, valid)
|
||||
err := r.readShards(all, valid)
|
||||
if err == io.EOF {
|
||||
if read == 0 {
|
||||
return ErrShardNoData
|
||||
|
@ -299,7 +422,7 @@ func (r rsStream) Reconstruct(valid []io.Reader, fill []io.Writer) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = writeShards(fill, all)
|
||||
err = r.writeShards(fill, all)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -65,6 +65,57 @@ func TestStreamEncoding(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStreamEncodingConcurrent(t *testing.T) {
|
||||
perShard := 10 << 20
|
||||
if testing.Short() {
|
||||
perShard = 50000
|
||||
}
|
||||
r, err := NewStreamC(10, 3, true, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
rand.Seed(0)
|
||||
input := randomBytes(10, perShard)
|
||||
data := toBuffers(input)
|
||||
par := emptyBuffers(3)
|
||||
|
||||
err = r.Encode(toReaders(data), toWriters(par))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// Reset Data
|
||||
data = toBuffers(input)
|
||||
|
||||
all := append(toReaders(data), toReaders(par)...)
|
||||
ok, err := r.Verify(all)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok {
|
||||
t.Fatal("Verification failed")
|
||||
}
|
||||
|
||||
err = r.Encode(toReaders(emptyBuffers(1)), toWriters(emptyBuffers(1)))
|
||||
if err != ErrTooFewShards {
|
||||
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
|
||||
}
|
||||
err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(1)))
|
||||
if err != ErrTooFewShards {
|
||||
t.Errorf("expected %v, got %v", ErrTooFewShards, err)
|
||||
}
|
||||
err = r.Encode(toReaders(emptyBuffers(10)), toWriters(emptyBuffers(3)))
|
||||
if err != ErrShardNoData {
|
||||
t.Errorf("expected %v, got %v", ErrShardNoData, err)
|
||||
}
|
||||
|
||||
badShards := emptyBuffers(10)
|
||||
badShards[0] = randomBuffer(123)
|
||||
err = r.Encode(toReaders(badShards), toWriters(emptyBuffers(3)))
|
||||
if err != ErrShardSize {
|
||||
t.Errorf("expected %v, got %v", ErrShardSize, err)
|
||||
}
|
||||
}
|
||||
|
||||
func randomBuffer(length int) *bytes.Buffer {
|
||||
b := make([]byte, length)
|
||||
for i := range b {
|
||||
|
|
Loading…
Reference in New Issue