diff --git a/wal/encoder.go b/wal/encoder.go index 4a6603654..efe58928c 100644 --- a/wal/encoder.go +++ b/wal/encoder.go @@ -18,6 +18,7 @@ import ( "encoding/binary" "hash" "io" + "os" "sync" "github.com/coreos/etcd/pkg/crc" @@ -39,9 +40,9 @@ type encoder struct { uint64buf []byte } -func newEncoder(w io.Writer, prevCrc uint32) *encoder { +func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder { return &encoder{ - bw: ioutil.NewPageWriter(w, walPageBytes), + bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset), crc: crc.New(prevCrc, crcTable), // 1MB buffer buf: make([]byte, 1024*1024), @@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder { } } +// newFileEncoder creates a new encoder with current file offset for the page writer. +func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) { + offset, err := f.Seek(0, os.SEEK_CUR) + if err != nil { + return nil, err + } + return newEncoder(f, prevCrc, int(offset)), nil +} + func (e *encoder) encode(rec *walpb.Record) error { e.mu.Lock() defer e.mu.Unlock() diff --git a/wal/record_test.go b/wal/record_test.go index ddbc37d86..2a6904a81 100644 --- a/wal/record_test.go +++ b/wal/record_test.go @@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) { typ := int64(0xABCD) d := []byte("Hello world!") buf := new(bytes.Buffer) - e := newEncoder(buf, 0) + e := newEncoder(buf, 0, 0) e.encode(&walpb.Record{Type: typ, Data: d}) e.flush() decoder := newDecoder(ioutil.NopCloser(buf)) diff --git a/wal/wal.go b/wal/wal.go index 0bd85c166..7719e930b 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -122,7 +122,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) { w := &WAL{ dir: dirpath, metadata: metadata, - encoder: newEncoder(f, 0), + } + w.encoder, err = newFileEncoder(f.File, 0) + if err != nil { + return nil, err } w.locks = append(w.locks, f) if err = w.saveCrc(0); err != nil { @@ -343,7 +346,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. if w.tail() != nil { // create encoder (chain crc with the decoder), enable appending - w.encoder = newEncoder(w.tail(), w.decoder.lastCRC()) + w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC()) + if err != nil { + return + } } w.decoder = nil @@ -377,7 +383,10 @@ func (w *WAL) cut() error { // update writer and save the previous crc w.locks = append(w.locks, newTail) prevCrc := w.encoder.crc.Sum32() - w.encoder = newEncoder(w.tail(), prevCrc) + w.encoder, err = newFileEncoder(w.tail().File, prevCrc) + if err != nil { + return err + } if err = w.saveCrc(prevCrc); err != nil { return err } @@ -416,7 +425,10 @@ func (w *WAL) cut() error { w.locks[len(w.locks)-1] = newTail prevCrc = w.encoder.crc.Sum32() - w.encoder = newEncoder(w.tail(), prevCrc) + w.encoder, err = newFileEncoder(w.tail().File, prevCrc) + if err != nil { + return err + } plog.Infof("segmented wal file %v is created", fpath) return nil diff --git a/wal/wal_test.go b/wal/wal_test.go index 6fc6f782e..a762ee255 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -61,7 +61,7 @@ func TestNew(t *testing.T) { } var wb bytes.Buffer - e := newEncoder(&wb, 0) + e := newEncoder(&wb, 0, 0) err = e.encode(&walpb.Record{Type: crcType, Crc: 0}) if err != nil { t.Fatalf("err = %v, want nil", err) @@ -528,7 +528,7 @@ func TestSaveEmpty(t *testing.T) { var buf bytes.Buffer var est raftpb.HardState w := WAL{ - encoder: newEncoder(&buf, 0), + encoder: newEncoder(&buf, 0, 0), } if err := w.saveState(&est); err != nil { t.Errorf("err = %v, want nil", err)