wal: fix tail corruption
On ReadAll, WAL seeks to the end of the last record in the tail. If the tail did not end with preallocated space, the decoder would report 0 as the last offset and begin writing at offset 0 of the tail. Fixes #4903release-3.0
parent
307cb5167c
commit
bfe3a3d08e
|
@ -60,17 +60,8 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
l, err := readInt64(d.brs[0])
|
l, err := readInt64(d.brs[0])
|
||||||
if err == io.EOF {
|
if err == io.EOF || (err == nil && l == 0) {
|
||||||
d.brs = d.brs[1:]
|
// hit end of file or preallocated space
|
||||||
d.lastValidOff = 0
|
|
||||||
return d.decodeRecord(rec)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if l == 0 {
|
|
||||||
// hit preallocated space
|
|
||||||
d.brs = d.brs[1:]
|
d.brs = d.brs[1:]
|
||||||
if len(d.brs) == 0 {
|
if len(d.brs) == 0 {
|
||||||
return io.EOF
|
return io.EOF
|
||||||
|
@ -78,6 +69,10 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
|
||||||
d.lastValidOff = 0
|
d.lastValidOff = 0
|
||||||
return d.decodeRecord(rec)
|
return d.decodeRecord(rec)
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
data := make([]byte, l)
|
data := make([]byte, l)
|
||||||
if _, err = io.ReadFull(d.brs[0], data); err != nil {
|
if _, err = io.ReadFull(d.brs[0], data); err != nil {
|
||||||
// ReadFull returns io.EOF only if no bytes were read
|
// ReadFull returns io.EOF only if no bytes were read
|
||||||
|
|
|
@ -55,9 +55,6 @@ func Repair(dirpath string) bool {
|
||||||
continue
|
continue
|
||||||
case io.EOF:
|
case io.EOF:
|
||||||
return true
|
return true
|
||||||
case ErrZeroTrailer:
|
|
||||||
plog.Noticef("found zero trailer in %v", f.Name())
|
|
||||||
fallthrough
|
|
||||||
case io.ErrUnexpectedEOF:
|
case io.ErrUnexpectedEOF:
|
||||||
plog.Noticef("repairing %v", f.Name())
|
plog.Noticef("repairing %v", f.Name())
|
||||||
bf, bferr := os.Create(f.Name() + ".broken")
|
bf, bferr := os.Create(f.Name() + ".broken")
|
||||||
|
|
|
@ -57,7 +57,6 @@ var (
|
||||||
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
ErrCRCMismatch = errors.New("wal: crc mismatch")
|
||||||
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
|
||||||
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
|
||||||
ErrZeroTrailer = errors.New("wal: zero trailer")
|
|
||||||
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
crcTable = crc32.MakeTable(crc32.Castagnoli)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -273,9 +272,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == ErrZeroTrailer {
|
|
||||||
err = io.EOF
|
|
||||||
}
|
|
||||||
switch w.tail() {
|
switch w.tail() {
|
||||||
case nil:
|
case nil:
|
||||||
// We do not have to read out all entries in read mode.
|
// We do not have to read out all entries in read mode.
|
||||||
|
|
|
@ -533,3 +533,69 @@ func TestReleaseLockTo(t *testing.T) {
|
||||||
t.Errorf("lockindex = %d, want %d", lockIndex, 10)
|
t.Errorf("lockindex = %d, want %d", lockIndex, 10)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
|
||||||
|
func TestTailWriteNoSlackSpace(t *testing.T) {
|
||||||
|
p, err := ioutil.TempDir(os.TempDir(), "waltest")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(p)
|
||||||
|
|
||||||
|
// create initial WAL
|
||||||
|
w, err := Create(p, []byte("metadata"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// write some entries
|
||||||
|
for i := 1; i <= 5; i++ {
|
||||||
|
es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
|
||||||
|
if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// get rid of slack space by truncating file
|
||||||
|
off, serr := w.tail().Seek(0, os.SEEK_CUR)
|
||||||
|
if serr != nil {
|
||||||
|
t.Fatal(serr)
|
||||||
|
}
|
||||||
|
if terr := w.tail().Truncate(off); terr != nil {
|
||||||
|
t.Fatal(terr)
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
|
||||||
|
// open, write more
|
||||||
|
w, err = Open(p, walpb.Snapshot{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, _, ents, rerr := w.ReadAll()
|
||||||
|
if rerr != nil {
|
||||||
|
t.Fatal(rerr)
|
||||||
|
}
|
||||||
|
if len(ents) != 5 {
|
||||||
|
t.Fatalf("got entries %+v, expected 5 entries", ents)
|
||||||
|
}
|
||||||
|
// write more entries
|
||||||
|
for i := 6; i <= 10; i++ {
|
||||||
|
es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
|
||||||
|
if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
|
||||||
|
// confirm all writes
|
||||||
|
w, err = Open(p, walpb.Snapshot{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, _, ents, rerr = w.ReadAll()
|
||||||
|
if rerr != nil {
|
||||||
|
t.Fatal(rerr)
|
||||||
|
}
|
||||||
|
if len(ents) != 10 {
|
||||||
|
t.Fatalf("got entries %+v, expected 10 entries", ents)
|
||||||
|
}
|
||||||
|
w.Close()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue