wal: add missing logs, improve pipeline test coverage

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
release-3.4
Gyuho Lee 2018-05-17 10:48:57 -07:00
parent 610dd0911d
commit e15ce28168
7 changed files with 273 additions and 49 deletions

View File

@ -75,7 +75,11 @@ func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
return nil, err
}
if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
if fp.lg != nil {
fp.lg.Warn("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
} else {
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
}
f.Close()
return nil, err
}

73
wal/file_pipeline_test.go Normal file
View File

@ -0,0 +1,73 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"io/ioutil"
"math"
"os"
"testing"
"go.uber.org/zap"
)
func TestFilePipeline(t *testing.T) {
tdir, err := ioutil.TempDir(os.TempDir(), "wal-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tdir)
fp := newFilePipeline(zap.NewExample(), tdir, SegmentSizeBytes)
defer fp.Close()
f, ferr := fp.Open()
if ferr != nil {
t.Fatal(ferr)
}
f.Close()
}
func TestFilePipelineFailPreallocate(t *testing.T) {
tdir, err := ioutil.TempDir(os.TempDir(), "wal-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tdir)
fp := newFilePipeline(zap.NewExample(), tdir, math.MaxInt64)
defer fp.Close()
f, ferr := fp.Open()
if f != nil || ferr == nil { // no space left on device
t.Fatal("expected error on invalid pre-allocate size, but no error")
}
}
func TestFilePipelineFailLockFile(t *testing.T) {
tdir, err := ioutil.TempDir(os.TempDir(), "wal-test")
if err != nil {
t.Fatal(err)
}
os.RemoveAll(tdir)
fp := newFilePipeline(zap.NewExample(), tdir, math.MaxInt64)
defer fp.Close()
f, ferr := fp.Open()
if f != nil || ferr == nil { // no such file or directory
t.Fatal("expected error on invalid pre-allocate size, but no error")
}
}

View File

@ -34,6 +34,12 @@ func Repair(lg *zap.Logger, dirpath string) bool {
}
defer f.Close()
if lg != nil {
lg.Info("repairing", zap.String("path", f.Name()))
} else {
plog.Noticef("repairing %v", f.Name())
}
rec := &walpb.Record{}
decoder := newDecoder(f)
for {
@ -55,18 +61,16 @@ func Repair(lg *zap.Logger, dirpath string) bool {
continue
case io.EOF:
if lg != nil {
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF))
}
return true
case io.ErrUnexpectedEOF:
if lg != nil {
lg.Info("repairing", zap.String("path", f.Name()))
} else {
plog.Noticef("repairing %v", f.Name())
}
bf, bferr := os.Create(f.Name() + ".broken")
if bferr != nil {
if lg != nil {
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"))
lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr))
} else {
plog.Errorf("could not repair %v, failed to create backup file", f.Name())
}
@ -76,7 +80,7 @@ func Repair(lg *zap.Logger, dirpath string) bool {
if _, err = f.Seek(0, io.SeekStart); err != nil {
if lg != nil {
lg.Warn("failed to read file", zap.String("path", f.Name()))
lg.Warn("failed to read file", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to read file", f.Name())
}
@ -85,7 +89,7 @@ func Repair(lg *zap.Logger, dirpath string) bool {
if _, err = io.Copy(bf, f); err != nil {
if lg != nil {
lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()))
lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to copy file", f.Name())
}
@ -94,25 +98,30 @@ func Repair(lg *zap.Logger, dirpath string) bool {
if err = f.Truncate(lastOffset); err != nil {
if lg != nil {
lg.Warn("failed to truncate", zap.String("path", f.Name()))
lg.Warn("failed to truncate", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to truncate file", f.Name())
}
return false
}
if err = fileutil.Fsync(f.File); err != nil {
if lg != nil {
lg.Warn("failed to fsync", zap.String("path", f.Name()))
lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair %v, failed to sync file", f.Name())
}
return false
}
if lg != nil {
lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF))
}
return true
default:
if lg != nil {
lg.Warn("failed to repair", zap.Error(err))
lg.Warn("failed to repair", zap.String("path", f.Name()), zap.Error(err))
} else {
plog.Errorf("could not repair error (%v)", err)
}
@ -123,7 +132,7 @@ func Repair(lg *zap.Logger, dirpath string) bool {
// openLast opens the last wal file for read and write.
func openLast(lg *zap.Logger, dirpath string) (*fileutil.LockedFile, error) {
names, err := readWalNames(lg, dirpath)
names, err := readWALNames(lg, dirpath)
if err != nil {
return nil, err
}

View File

@ -49,6 +49,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect
t.Fatal(err)
}
defer os.RemoveAll(p)
// create WAL
w, err := Create(zap.NewExample(), p, nil)
defer func() {
@ -90,7 +91,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect
// repair the wal
if ok := Repair(zap.NewExample(), p); !ok {
t.Fatalf("fix = %t, want %t", ok, true)
t.Fatalf("'Repair' returned '%v', want 'true'", ok)
}
// read it back
@ -184,3 +185,57 @@ func TestRepairWriteTearMiddle(t *testing.T) {
}
testRepair(t, ents, corruptf, 1)
}
func TestRepairFailDeleteDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
w, err := Create(zap.NewExample(), p, nil)
if err != nil {
t.Fatal(err)
}
oldSegmentSizeBytes := SegmentSizeBytes
SegmentSizeBytes = 64
defer func() {
SegmentSizeBytes = oldSegmentSizeBytes
}()
for _, es := range makeEnts(50) {
if err = w.Save(raftpb.HardState{}, es); err != nil {
t.Fatal(err)
}
}
_, serr := w.tail().Seek(0, io.SeekCurrent)
if serr != nil {
t.Fatal(serr)
}
w.Close()
f, err := openLast(zap.NewExample(), p)
if err != nil {
t.Fatal(err)
}
if terr := f.Truncate(20); terr != nil {
t.Fatal(err)
}
f.Close()
w, err = Open(zap.NewExample(), p, walpb.Snapshot{})
if err != nil {
t.Fatal(err)
}
_, _, _, err = w.ReadAll()
if err != io.ErrUnexpectedEOF {
t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF)
}
w.Close()
os.RemoveAll(p)
if Repair(zap.NewExample(), p) {
t.Fatal("expect 'Repair' fail on unexpected directory deletion")
}
}

View File

@ -20,15 +20,15 @@ import (
"strings"
"github.com/coreos/etcd/pkg/fileutil"
"go.uber.org/zap"
)
var (
badWalName = errors.New("bad wal name")
)
var errBadWALName = errors.New("bad wal name")
func Exist(dirpath string) bool {
names, err := fileutil.ReadDir(dirpath)
// Exist returns true if there are any files in a given directory.
func Exist(dir string) bool {
names, err := fileutil.ReadDir(dir)
if err != nil {
return false
}
@ -38,12 +38,16 @@ func Exist(dirpath string) bool {
// searchIndex returns the last array index of names whose raft index section is
// equal to or smaller than the given index.
// The given names MUST be sorted.
func searchIndex(names []string, index uint64) (int, bool) {
func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) {
for i := len(names) - 1; i >= 0; i-- {
name := names[i]
_, curIndex, err := parseWalName(name)
_, curIndex, err := parseWALName(name)
if err != nil {
plog.Panicf("parse correct name should never fail: %v", err)
if lg != nil {
lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
} else {
plog.Panicf("parse correct name should never fail: %v", err)
}
}
if index >= curIndex {
return i, true
@ -54,12 +58,16 @@ func searchIndex(names []string, index uint64) (int, bool) {
// names should have been sorted based on sequence number.
// isValidSeq checks whether seq increases continuously.
func isValidSeq(names []string) bool {
func isValidSeq(lg *zap.Logger, names []string) bool {
var lastSeq uint64
for _, name := range names {
curSeq, _, err := parseWalName(name)
curSeq, _, err := parseWALName(name)
if err != nil {
plog.Panicf("parse correct name should never fail: %v", err)
if lg != nil {
lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err))
} else {
plog.Panicf("parse correct name should never fail: %v", err)
}
}
if lastSeq != 0 && lastSeq != curSeq-1 {
return false
@ -69,7 +77,7 @@ func isValidSeq(names []string) bool {
return true
}
func readWalNames(lg *zap.Logger, dirpath string) ([]string, error) {
func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) {
names, err := fileutil.ReadDir(dirpath)
if err != nil {
return nil, err
@ -84,7 +92,7 @@ func readWalNames(lg *zap.Logger, dirpath string) ([]string, error) {
func checkWalNames(lg *zap.Logger, names []string) []string {
wnames := make([]string, 0)
for _, name := range names {
if _, _, err := parseWalName(name); err != nil {
if _, _, err := parseWALName(name); err != nil {
// don't complain about left over tmp files
if !strings.HasSuffix(name, ".tmp") {
if lg != nil {
@ -103,9 +111,9 @@ func checkWalNames(lg *zap.Logger, names []string) []string {
return wnames
}
func parseWalName(str string) (seq, index uint64, err error) {
func parseWALName(str string) (seq, index uint64, err error) {
if !strings.HasSuffix(str, ".wal") {
return 0, 0, badWalName
return 0, 0, errBadWALName
}
_, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index)
return seq, index, err

View File

@ -172,7 +172,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
return nil, err
}
if w, err = w.renameWal(tmpdirpath); err != nil {
if w, err = w.renameWAL(tmpdirpath); err != nil {
if lg != nil {
lg.Warn(
"failed to rename the temporary WAL directory",
@ -223,7 +223,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
return w, nil
}
func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
if err := os.RemoveAll(w.dir); err != nil {
return nil, err
}
@ -235,7 +235,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
// process holds the lock.
if err := os.Rename(tmpdirpath, w.dir); err != nil {
if _, ok := err.(*os.LinkError); ok {
return w.renameWalUnlock(tmpdirpath)
return w.renameWALUnlock(tmpdirpath)
}
return nil, err
}
@ -245,23 +245,24 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
return w, err
}
func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
// rename of directory with locked files doesn't work on windows/cifs;
// close the WAL to release the locks so the directory can be renamed.
if w.lg != nil {
w.lg.Info(
"releasing flock to rename",
"closing WAL to release flock and retry directory renaming",
zap.String("from", tmpdirpath),
zap.String("to", w.dir),
)
} else {
plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
}
w.Close()
if err := os.Rename(tmpdirpath, w.dir); err != nil {
return nil, err
}
// reopen and relock
newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
if oerr != nil {
@ -298,13 +299,13 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
}
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
names, err := readWalNames(lg, dirpath)
names, err := readWALNames(lg, dirpath)
if err != nil {
return nil, err
}
nameIndex, ok := searchIndex(names, snap.Index)
if !ok || !isValidSeq(names[nameIndex:]) {
nameIndex, ok := searchIndex(lg, names, snap.Index)
if !ok || !isValidSeq(lg, names[nameIndex:]) {
return nil, ErrFileNotFound
}
@ -350,7 +351,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
// write reuses the file descriptors from read; don't close so
// WAL can append without dropping the file lock
w.readClose = nil
if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
closer()
return nil, err
}
@ -386,14 +387,17 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
ents = append(ents[:e.Index-w.start.Index-1], e)
}
w.enti = e.Index
case stateType:
state = mustUnmarshalState(rec.Data)
case metadataType:
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
state.Reset()
return nil, state, nil, ErrMetadataConflict
}
metadata = rec.Data
case crcType:
crc := decoder.crc.Sum32()
// current crc of decoder must match the crc of the record.
@ -403,6 +407,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
return nil, state, nil, ErrCRCMismatch
}
decoder.updateCRC(rec.Crc)
case snapshotType:
var snap walpb.Snapshot
pbutil.MustUnmarshal(&snap, rec.Data)
@ -413,6 +418,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
}
match = true
}
default:
state.Reset()
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
@ -483,9 +489,11 @@ func (w *WAL) cut() error {
if serr != nil {
return serr
}
if err := w.tail().Truncate(off); err != nil {
return err
}
if err := w.sync(); err != nil {
return err
}
@ -505,15 +513,19 @@ func (w *WAL) cut() error {
if err != nil {
return err
}
if err = w.saveCrc(prevCrc); err != nil {
return err
}
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
return err
}
if err = w.saveState(&w.state); err != nil {
return err
}
// atomically move temp wal file to wal file
if err = w.sync(); err != nil {
return err
@ -550,7 +562,7 @@ func (w *WAL) cut() error {
}
if w.lg != nil {
w.lg.Info("created a new WAL segment", zap.String("path", fpath))
} else {
plog.Infof("segmented wal file %v is created", fpath)
}
@ -578,8 +590,8 @@ func (w *WAL) sync() error {
plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration)
}
}
syncDurations.Observe(took.Seconds())
return err
}
@ -597,9 +609,8 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
var smaller int
found := false
for i, l := range w.locks {
_, lockIndex, err := parseWalName(filepath.Base(l.Name()))
_, lockIndex, err := parseWALName(filepath.Base(l.Name()))
if err != nil {
return err
}
@ -631,6 +642,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
return nil
}
// Close closes the current WAL file and directory.
func (w *WAL) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
@ -651,7 +663,7 @@ func (w *WAL) Close() error {
}
if err := l.Close(); err != nil {
if w.lg != nil {
w.lg.Error("failed to close WAL", zap.Error(err))
w.lg.Warn("failed to close WAL", zap.Error(err))
} else {
plog.Errorf("failed to unlock during closing wal: %s", err)
}
@ -750,7 +762,7 @@ func (w *WAL) seq() uint64 {
if t == nil {
return 0
}
seq, _, err := parseWalName(filepath.Base(t.Name()))
seq, _, err := parseWALName(filepath.Base(t.Name()))
if err != nil {
if w.lg != nil {
w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))

View File

@ -18,6 +18,7 @@ import (
"bytes"
"io"
"io/ioutil"
"math"
"os"
"path/filepath"
"reflect"
@ -85,6 +86,39 @@ func TestNew(t *testing.T) {
}
}
func TestCreateFailFromPollutedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary)
_, err = Create(zap.NewExample(), p, []byte("data"))
if err != os.ErrExist {
t.Fatalf("expected %v, got %v", os.ErrExist, err)
}
}
func TestCreateFailFromNoSpaceLeft(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
oldSegmentSizeBytes := SegmentSizeBytes
defer func() {
SegmentSizeBytes = oldSegmentSizeBytes
}()
SegmentSizeBytes = math.MaxInt64
_, err = Create(zap.NewExample(), p, []byte("data"))
if err == nil { // no space left on device
t.Fatalf("expected error 'no space left on device', got nil")
}
}
func TestNewForInitedDir(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
@ -359,7 +393,7 @@ func TestSearchIndex(t *testing.T) {
},
}
for i, tt := range tests {
idx, ok := searchIndex(tt.names, tt.index)
idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index)
if idx != tt.widx {
t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx)
}
@ -380,7 +414,7 @@ func TestScanWalName(t *testing.T) {
{"0000000000000000-0000000000000000.snap", 0, 0, false},
}
for i, tt := range tests {
s, index, err := parseWalName(tt.str)
s, index, err := parseWALName(tt.str)
if g := err == nil; g != tt.wok {
t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok)
}
@ -583,7 +617,7 @@ func TestReleaseLockTo(t *testing.T) {
}
for i, l := range w.locks {
var lockIndex uint64
_, lockIndex, err = parseWalName(filepath.Base(l.Name()))
_, lockIndex, err = parseWALName(filepath.Base(l.Name()))
if err != nil {
t.Fatal(err)
}
@ -601,7 +635,7 @@ func TestReleaseLockTo(t *testing.T) {
if len(w.locks) != 1 {
t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
}
_, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name()))
_, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name()))
if err != nil {
t.Fatal(err)
}
@ -802,3 +836,32 @@ func TestOpenOnTornWrite(t *testing.T) {
t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
}
}
func TestRenameFail(t *testing.T) {
p, err := ioutil.TempDir(os.TempDir(), "waltest")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(p)
oldSegmentSizeBytes := SegmentSizeBytes
defer func() {
SegmentSizeBytes = oldSegmentSizeBytes
}()
SegmentSizeBytes = math.MaxInt64
tp, terr := ioutil.TempDir(os.TempDir(), "waltest")
if terr != nil {
t.Fatal(terr)
}
os.RemoveAll(tp)
w := &WAL{
lg: zap.NewExample(),
dir: p,
}
w2, werr := w.renameWAL(tp)
if w2 != nil || werr == nil { // os.Rename should fail from 'no such file or directory'
t.Fatalf("expected error, got %v", werr)
}
}