From 1d573f2541e3d6b71165ed4e22f7f6bd2f240866 Mon Sep 17 00:00:00 2001 From: klauspost Date: Tue, 27 Oct 2015 13:56:36 +0100 Subject: [PATCH] Add streaming API examples. --- examples/README.md | 11 +++ examples/stream-decoder.go | 167 +++++++++++++++++++++++++++++++++++++ examples/stream-encoder.go | 142 +++++++++++++++++++++++++++++++ 3 files changed, 320 insertions(+) create mode 100644 examples/stream-decoder.go create mode 100644 examples/stream-encoder.go diff --git a/examples/README.md b/examples/README.md index fbc8c7b..7c5ad53 100644 --- a/examples/README.md +++ b/examples/README.md @@ -15,6 +15,17 @@ go build simple-decoder.go go build simple-encoder.go ``` +# Streamin API examples + +There are streaming examples of the same functionality, which streams data instead of keeping it in memory. + +To build the executables use: + +```bash +go build stream-decoder.go +go build stream-encoder.go +``` + ## Shortcomings * If the file size of the input isn't diviable by the number of data shards the output will contain extra zeroes diff --git a/examples/stream-decoder.go b/examples/stream-decoder.go new file mode 100644 index 0000000..1766018 --- /dev/null +++ b/examples/stream-decoder.go @@ -0,0 +1,167 @@ +//+build ignore + +// Copyright 2015, Klaus Post, see LICENSE for details. +// +// Stream decoder example. +// +// The decoder reverses the process of "stream-encoder.go" +// +// To build an executable use: +// +// go build stream-decoder.go +// +// Simple Encoder/Decoder Shortcomings: +// * If the file size of the input isn't dividable by the number of data shards +// the output will contain extra zeroes +// +// * If the shard numbers isn't the same for the decoder as in the +// encoder, invalid output will be generated. +// +// * If values have changed in a shard, it cannot be reconstructed. +// +// * If two shards have been swapped, reconstruction will always fail. +// You need to supply the shards in the same order as they were given to you. +// +// The solution for this is to save a metadata file containing: +// +// * File size. +// * The number of data/parity shards. +// * HASH of each shard. +// * Order of the shards. +// +// If you save these properties, you should abe able to detect file corruption +// in a shard and be able to reconstruct your data if you have the needed number of shards left. + +package main + +import ( + "flag" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/klauspost/reedsolomon" +) + +var dataShards = flag.Int("data", 4, "Number of shards to split the data into") +var parShards = flag.Int("par", 2, "Number of parity shards") +var outFile = flag.String("out", "", "Alternative output path/file") + +func init() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [-flags] basefile.ext\nDo not add the number to the filename.\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Valid flags:\n") + flag.PrintDefaults() + } +} + +func main() { + // Parse flags + flag.Parse() + args := flag.Args() + if len(args) != 1 { + fmt.Fprintf(os.Stderr, "Error: No filenames given\n") + flag.Usage() + os.Exit(1) + } + fname := args[0] + + // Create matrix + enc, err := reedsolomon.NewStream(*dataShards, *parShards) + checkErr(err) + + // Open the inputs + shards, size, err := openInput(*dataShards, *parShards, fname) + checkErr(err) + + // Verify the shards + ok, err := enc.Verify(shards) + if ok { + fmt.Println("No reconstruction needed") + } else { + fmt.Println("Verification failed. Reconstructing data") + shards, size, err = openInput(*dataShards, *parShards, fname) + checkErr(err) + // Create out destination writers + out := make([]io.Writer, len(shards)) + for i := range out { + if shards[i] == nil { + dir, _ := filepath.Split(fname) + outfn := fmt.Sprintf("%s.%d", fname, i) + fmt.Println("Creating", outfn) + out[i], err = os.Create(filepath.Join(dir, outfn)) + checkErr(err) + } + } + err = enc.Reconstruct(shards, out) + if err != nil { + fmt.Println("Reconstruct failed -", err) + os.Exit(1) + } + // Close output. + for i := range out { + if out[i] != nil { + err := out[i].(*os.File).Close() + checkErr(err) + } + } + shards, size, err = openInput(*dataShards, *parShards, fname) + ok, err = enc.Verify(shards) + if !ok { + fmt.Println("Verification failed after reconstruction, data likely corrupted:", err) + os.Exit(1) + } + checkErr(err) + } + + // Join the shards and write them + outfn := *outFile + if outfn == "" { + outfn = fname + } + + fmt.Println("Writing data to", outfn) + f, err := os.Create(outfn) + checkErr(err) + + shards, size, err = openInput(*dataShards, *parShards, fname) + checkErr(err) + + // We don't know the exact filesize. + err = enc.Join(f, shards, int64(*dataShards)*size) + checkErr(err) +} + +func openInput(dataShards, parShards int, fname string) (r []io.Reader, size int64, err error) { + // Create shards and load the data. + shards := make([]io.Reader, dataShards+parShards) + for i := range shards { + infn := fmt.Sprintf("%s.%d", fname, i) + fmt.Println("Opening", infn) + f, err := os.Open(infn) + if err != nil { + fmt.Println("Error reading file", err) + shards[i] = nil + continue + } else { + shards[i] = f + } + stat, err := f.Stat() + checkErr(err) + if stat.Size() > 0 { + size = stat.Size() + } else { + shards[i] = nil + } + } + return shards, size, nil +} + +func checkErr(err error) { + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %s", err.Error()) + os.Exit(2) + } +} diff --git a/examples/stream-encoder.go b/examples/stream-encoder.go new file mode 100644 index 0000000..638fc5d --- /dev/null +++ b/examples/stream-encoder.go @@ -0,0 +1,142 @@ +//+build ignore + +// Copyright 2015, Klaus Post, see LICENSE for details. +// +// Simple stream encoder example +// +// The encoder encodes a single file into a number of shards +// To reverse the process see "stream-decoder.go" +// +// To build an executable use: +// +// go build stream-encoder.go +// +// Simple Encoder/Decoder Shortcomings: +// * If the file size of the input isn't dividable by the number of data shards +// the output will contain extra zeroes +// +// * If the shard numbers isn't the same for the decoder as in the +// encoder, invalid output will be generated. +// +// * If values have changed in a shard, it cannot be reconstructed. +// +// * If two shards have been swapped, reconstruction will always fail. +// You need to supply the shards in the same order as they were given to you. +// +// The solution for this is to save a metadata file containing: +// +// * File size. +// * The number of data/parity shards. +// * HASH of each shard. +// * Order of the shards. +// +// If you save these properties, you should abe able to detect file corruption +// in a shard and be able to reconstruct your data if you have the needed number of shards left. + +package main + +import ( + "flag" + "fmt" + "os" + "path/filepath" + + "io" + + "github.com/klauspost/reedsolomon" +) + +var dataShards = flag.Int("data", 4, "Number of shards to split the data into, must be below 257.") +var parShards = flag.Int("par", 2, "Number of parity shards") +var outDir = flag.String("out", "", "Alternative output directory") + +func init() { + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s [-flags] filename.ext\n\n", os.Args[0]) + fmt.Fprintf(os.Stderr, "Valid flags:\n") + flag.PrintDefaults() + } +} + +func main() { + // Parse command line parameters. + flag.Parse() + args := flag.Args() + if len(args) != 1 { + fmt.Fprintf(os.Stderr, "Error: No input filename given\n") + flag.Usage() + os.Exit(1) + } + if *dataShards > 257 { + fmt.Fprintf(os.Stderr, "Error: Too many data shards\n") + os.Exit(1) + } + fname := args[0] + + // Create encoding matrix. + enc, err := reedsolomon.NewStream(*dataShards, *parShards) + checkErr(err) + + fmt.Println("Opening", fname) + f, err := os.Open(fname) + checkErr(err) + + instat, err := f.Stat() + checkErr(err) + + shards := *dataShards + *parShards + out := make([]*os.File, shards) + + // Create the resulting files. + dir, file := filepath.Split(fname) + if *outDir != "" { + dir = *outDir + } + for i := range out { + outfn := fmt.Sprintf("%s.%d", file, i) + fmt.Println("Creating", outfn) + out[i], err = os.Create(filepath.Join(dir, outfn)) + checkErr(err) + } + + // Split into files. + data := make([]io.Writer, *dataShards) + for i := range data { + data[i] = out[i] + } + // Do the split + err = enc.Split(f, data, instat.Size()) + checkErr(err) + + // Close and re-open the files. + input := make([]io.Reader, *dataShards) + + for i := range data { + out[i].Close() + f, err := os.Open(out[i].Name()) + checkErr(err) + input[i] = f + defer f.Close() + } + + // Create parity output writers + parity := make([]io.Writer, *parShards) + for i := range parity { + parity[i] = out[*dataShards+i] + defer out[*dataShards+i].Close() + } + + // Encode parity + err = enc.Encode(input, parity) + checkErr(err) + fmt.Printf("File split into %d data + %d parity shards.\n", *dataShards, *parShards) + +} + +func checkErr(err error) { + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %s", err.Error()) + os.Exit(2) + } +}