Create separate read/write errors.
parent
ff9a5aff03
commit
7d91d8d953
51
streaming.go
51
streaming.go
|
@ -79,20 +79,37 @@ type StreamEncoder interface {
|
|||
|
||||
// StreamReadError is returned when a read error is encountered
|
||||
// that relates to a supplied stream. This will allow you to
|
||||
// find out which reader/writer has failed.
|
||||
type StreamError struct {
|
||||
Err error // The error
|
||||
Stream int // The stream number on which the error occurred
|
||||
Op string // Will be "read" or "write".
|
||||
// find out which reader has failed.
|
||||
type StreamReadError struct {
|
||||
Err error // The error
|
||||
Stream int // The stream number on which the error occurred
|
||||
}
|
||||
|
||||
// Error returns the error as a string
|
||||
func (s StreamError) Error() string {
|
||||
return fmt.Sprintf("error on %s stream #%d: %s", s.Op, s.Stream, s.Err)
|
||||
func (s StreamReadError) Error() string {
|
||||
return fmt.Sprintf("error reading stream %d: %s", s.Stream, s.Err)
|
||||
}
|
||||
|
||||
// String returns the error as a string
|
||||
func (s StreamError) String() string {
|
||||
func (s StreamReadError) String() string {
|
||||
return s.Error()
|
||||
}
|
||||
|
||||
// StreamWriteError is returned when a write error is encountered
|
||||
// that relates to a supplied stream. This will allow you to
|
||||
// find out which reader has failed.
|
||||
type StreamWriteError struct {
|
||||
Err error // The error
|
||||
Stream int // The stream number on which the error occurred
|
||||
}
|
||||
|
||||
// Error returns the error as a string
|
||||
func (s StreamWriteError) Error() string {
|
||||
return fmt.Sprintf("error writing stream %d: %s", s.Stream, s.Err)
|
||||
}
|
||||
|
||||
// String returns the error as a string
|
||||
func (s StreamWriteError) String() string {
|
||||
return s.Error()
|
||||
}
|
||||
|
||||
|
@ -241,7 +258,7 @@ func readShards(dst [][]byte, in []io.Reader) error {
|
|||
case nil:
|
||||
continue
|
||||
default:
|
||||
return StreamError{Err: err, Op: "read", Stream: i}
|
||||
return StreamReadError{Err: err, Stream: i}
|
||||
}
|
||||
}
|
||||
if size == 0 {
|
||||
|
@ -260,11 +277,11 @@ func writeShards(out []io.Writer, in [][]byte) error {
|
|||
}
|
||||
n, err := out[i].Write(in[i])
|
||||
if err != nil {
|
||||
return StreamError{Err: err, Op: "write", Stream: i}
|
||||
return StreamWriteError{Err: err, Stream: i}
|
||||
}
|
||||
//
|
||||
if n != len(in[i]) {
|
||||
return StreamError{Err: io.ErrShortWrite, Op: "write", Stream: i}
|
||||
return StreamWriteError{Err: io.ErrShortWrite, Stream: i}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -315,7 +332,7 @@ func cReadShards(dst [][]byte, in []io.Reader) error {
|
|||
dst[r.n] = dst[r.n][0:r.size]
|
||||
case nil:
|
||||
default:
|
||||
return StreamError{Err: r.err, Op: "read", Stream: r.n}
|
||||
return StreamReadError{Err: r.err, Stream: r.n}
|
||||
}
|
||||
}
|
||||
if size == 0 {
|
||||
|
@ -341,11 +358,11 @@ func cWriteShards(out []io.Writer, in [][]byte) error {
|
|||
}
|
||||
n, err := out[i].Write(in[i])
|
||||
if err != nil {
|
||||
errs <- StreamError{Err: err, Op: "write", Stream: i}
|
||||
errs <- StreamWriteError{Err: err, Stream: i}
|
||||
return
|
||||
}
|
||||
if n != len(in[i]) {
|
||||
errs <- StreamError{Err: io.ErrShortWrite, Op: "write", Stream: i}
|
||||
errs <- StreamWriteError{Err: io.ErrShortWrite, Stream: i}
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
@ -464,7 +481,7 @@ func (r rsStream) Join(dst io.Writer, shards []io.Reader, outSize int64) error {
|
|||
shards = shards[:r.r.DataShards]
|
||||
for i := range shards {
|
||||
if shards[i] == nil {
|
||||
return StreamError{Err: ErrShardNoData, Op: "read", Stream: i}
|
||||
return StreamReadError{Err: ErrShardNoData, Stream: i}
|
||||
}
|
||||
}
|
||||
// Join all shards
|
||||
|
@ -504,7 +521,7 @@ func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
|
|||
|
||||
for i := range dst {
|
||||
if dst[i] == nil {
|
||||
return StreamError{Err: ErrShardNoData, Op: "write", Stream: i}
|
||||
return StreamWriteError{Err: ErrShardNoData, Stream: i}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -519,7 +536,7 @@ func (r rsStream) Split(data io.Reader, dst []io.Writer, size int64) error {
|
|||
for i := range dst {
|
||||
n, err := io.CopyN(dst[i], data, perShard)
|
||||
if err != io.EOF && err != nil {
|
||||
return StreamError{Err: err, Op: "write", Stream: i}
|
||||
return err
|
||||
}
|
||||
if n != perShard {
|
||||
return ErrShortData
|
||||
|
|
|
@ -599,12 +599,15 @@ func TestStreamSplitJoin(t *testing.T) {
|
|||
bufs := toReaders(emptyBuffers(5))
|
||||
bufs[2] = nil
|
||||
err = enc.Join(buf, bufs, 0)
|
||||
if se, ok := err.(StreamError); ok {
|
||||
if se, ok := err.(StreamReadError); ok {
|
||||
if se.Err != ErrShardNoData {
|
||||
t.Errorf("expected %v, got %v", ErrShardNoData, se.Err)
|
||||
}
|
||||
if se.Stream != 2 {
|
||||
t.Errorf("Expected error on stream 2, got %d", se.Stream)
|
||||
}
|
||||
} else {
|
||||
t.Errorf("expected error type %T, got %T", StreamError{}, err)
|
||||
t.Errorf("expected error type %T, got %T", StreamReadError{}, err)
|
||||
}
|
||||
|
||||
err = enc.Join(buf, toReaders(toBuffers(splits)), int64(len(data)+1))
|
||||
|
|
Loading…
Reference in New Issue