wal: set PageWriter offset in file encoder
parent
bf0da78b63
commit
023f335f67
|
@ -18,6 +18,7 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"hash"
|
"hash"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/etcd/pkg/crc"
|
"github.com/coreos/etcd/pkg/crc"
|
||||||
|
@ -39,9 +40,9 @@ type encoder struct {
|
||||||
uint64buf []byte
|
uint64buf []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func newEncoder(w io.Writer, prevCrc uint32) *encoder {
|
func newEncoder(w io.Writer, prevCrc uint32, pageOffset int) *encoder {
|
||||||
return &encoder{
|
return &encoder{
|
||||||
bw: ioutil.NewPageWriter(w, walPageBytes),
|
bw: ioutil.NewPageWriter(w, walPageBytes, pageOffset),
|
||||||
crc: crc.New(prevCrc, crcTable),
|
crc: crc.New(prevCrc, crcTable),
|
||||||
// 1MB buffer
|
// 1MB buffer
|
||||||
buf: make([]byte, 1024*1024),
|
buf: make([]byte, 1024*1024),
|
||||||
|
@ -49,6 +50,15 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// newFileEncoder creates a new encoder with current file offset for the page writer.
|
||||||
|
func newFileEncoder(f *os.File, prevCrc uint32) (*encoder, error) {
|
||||||
|
offset, err := f.Seek(0, os.SEEK_CUR)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return newEncoder(f, prevCrc, int(offset)), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (e *encoder) encode(rec *walpb.Record) error {
|
func (e *encoder) encode(rec *walpb.Record) error {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
defer e.mu.Unlock()
|
defer e.mu.Unlock()
|
||||||
|
|
|
@ -69,7 +69,7 @@ func TestWriteRecord(t *testing.T) {
|
||||||
typ := int64(0xABCD)
|
typ := int64(0xABCD)
|
||||||
d := []byte("Hello world!")
|
d := []byte("Hello world!")
|
||||||
buf := new(bytes.Buffer)
|
buf := new(bytes.Buffer)
|
||||||
e := newEncoder(buf, 0)
|
e := newEncoder(buf, 0, 0)
|
||||||
e.encode(&walpb.Record{Type: typ, Data: d})
|
e.encode(&walpb.Record{Type: typ, Data: d})
|
||||||
e.flush()
|
e.flush()
|
||||||
decoder := newDecoder(ioutil.NopCloser(buf))
|
decoder := newDecoder(ioutil.NopCloser(buf))
|
||||||
|
|
20
wal/wal.go
20
wal/wal.go
|
@ -120,7 +120,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
||||||
w := &WAL{
|
w := &WAL{
|
||||||
dir: dirpath,
|
dir: dirpath,
|
||||||
metadata: metadata,
|
metadata: metadata,
|
||||||
encoder: newEncoder(f, 0),
|
}
|
||||||
|
w.encoder, err = newFileEncoder(f.File, 0)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
w.locks = append(w.locks, f)
|
w.locks = append(w.locks, f)
|
||||||
if err = w.saveCrc(0); err != nil {
|
if err = w.saveCrc(0); err != nil {
|
||||||
|
@ -341,7 +344,10 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||||
|
|
||||||
if w.tail() != nil {
|
if w.tail() != nil {
|
||||||
// create encoder (chain crc with the decoder), enable appending
|
// create encoder (chain crc with the decoder), enable appending
|
||||||
w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
w.encoder, err = newFileEncoder(w.tail().File, w.decoder.lastCRC())
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
w.decoder = nil
|
w.decoder = nil
|
||||||
|
|
||||||
|
@ -375,7 +381,10 @@ func (w *WAL) cut() error {
|
||||||
// update writer and save the previous crc
|
// update writer and save the previous crc
|
||||||
w.locks = append(w.locks, newTail)
|
w.locks = append(w.locks, newTail)
|
||||||
prevCrc := w.encoder.crc.Sum32()
|
prevCrc := w.encoder.crc.Sum32()
|
||||||
w.encoder = newEncoder(w.tail(), prevCrc)
|
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err = w.saveCrc(prevCrc); err != nil {
|
if err = w.saveCrc(prevCrc); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -414,7 +423,10 @@ func (w *WAL) cut() error {
|
||||||
w.locks[len(w.locks)-1] = newTail
|
w.locks[len(w.locks)-1] = newTail
|
||||||
|
|
||||||
prevCrc = w.encoder.crc.Sum32()
|
prevCrc = w.encoder.crc.Sum32()
|
||||||
w.encoder = newEncoder(w.tail(), prevCrc)
|
w.encoder, err = newFileEncoder(w.tail().File, prevCrc)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
plog.Infof("segmented wal file %v is created", fpath)
|
plog.Infof("segmented wal file %v is created", fpath)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -61,7 +61,7 @@ func TestNew(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var wb bytes.Buffer
|
var wb bytes.Buffer
|
||||||
e := newEncoder(&wb, 0)
|
e := newEncoder(&wb, 0, 0)
|
||||||
err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
|
err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err = %v, want nil", err)
|
t.Fatalf("err = %v, want nil", err)
|
||||||
|
@ -465,7 +465,7 @@ func TestSaveEmpty(t *testing.T) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
var est raftpb.HardState
|
var est raftpb.HardState
|
||||||
w := WAL{
|
w := WAL{
|
||||||
encoder: newEncoder(&buf, 0),
|
encoder: newEncoder(&buf, 0, 0),
|
||||||
}
|
}
|
||||||
if err := w.saveState(&est); err != nil {
|
if err := w.saveState(&est); err != nil {
|
||||||
t.Errorf("err = %v, want nil", err)
|
t.Errorf("err = %v, want nil", err)
|
||||||
|
|
Loading…
Reference in New Issue