From e15ce28168d5b5600f5e580ee63486cdd644d504 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Thu, 17 May 2018 10:48:57 -0700 Subject: [PATCH] wal: add missing logs, improve pipeline test coverage Signed-off-by: Gyuho Lee --- wal/file_pipeline.go | 6 +++- wal/file_pipeline_test.go | 73 +++++++++++++++++++++++++++++++++++++++ wal/repair.go | 33 +++++++++++------- wal/repair_test.go | 57 +++++++++++++++++++++++++++++- wal/util.go | 38 ++++++++++++-------- wal/wal.go | 44 ++++++++++++++--------- wal/wal_test.go | 71 ++++++++++++++++++++++++++++++++++--- 7 files changed, 273 insertions(+), 49 deletions(-) create mode 100644 wal/file_pipeline_test.go diff --git a/wal/file_pipeline.go b/wal/file_pipeline.go index f6d6433f6..664649a15 100644 --- a/wal/file_pipeline.go +++ b/wal/file_pipeline.go @@ -75,7 +75,11 @@ func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) { return nil, err } if err = fileutil.Preallocate(f.File, fp.size, true); err != nil { - plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + if fp.lg != nil { + fp.lg.Warn("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err)) + } else { + plog.Errorf("failed to allocate space when creating new wal file (%v)", err) + } f.Close() return nil, err } diff --git a/wal/file_pipeline_test.go b/wal/file_pipeline_test.go new file mode 100644 index 000000000..01560570e --- /dev/null +++ b/wal/file_pipeline_test.go @@ -0,0 +1,73 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "io/ioutil" + "math" + "os" + "testing" + + "go.uber.org/zap" +) + +func TestFilePipeline(t *testing.T) { + tdir, err := ioutil.TempDir(os.TempDir(), "wal-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tdir) + + fp := newFilePipeline(zap.NewExample(), tdir, SegmentSizeBytes) + defer fp.Close() + + f, ferr := fp.Open() + if ferr != nil { + t.Fatal(ferr) + } + f.Close() +} + +func TestFilePipelineFailPreallocate(t *testing.T) { + tdir, err := ioutil.TempDir(os.TempDir(), "wal-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tdir) + + fp := newFilePipeline(zap.NewExample(), tdir, math.MaxInt64) + defer fp.Close() + + f, ferr := fp.Open() + if f != nil || ferr == nil { // no space left on device + t.Fatal("expected error on invalid pre-allocate size, but no error") + } +} + +func TestFilePipelineFailLockFile(t *testing.T) { + tdir, err := ioutil.TempDir(os.TempDir(), "wal-test") + if err != nil { + t.Fatal(err) + } + os.RemoveAll(tdir) + + fp := newFilePipeline(zap.NewExample(), tdir, math.MaxInt64) + defer fp.Close() + + f, ferr := fp.Open() + if f != nil || ferr == nil { // no such file or directory + t.Fatal("expected error on invalid pre-allocate size, but no error") + } +} diff --git a/wal/repair.go b/wal/repair.go index a61db014e..dd62ac0de 100644 --- a/wal/repair.go +++ b/wal/repair.go @@ -34,6 +34,12 @@ func Repair(lg *zap.Logger, dirpath string) bool { } defer f.Close() + if lg != nil { + lg.Info("repairing", zap.String("path", f.Name())) + } else { + plog.Noticef("repairing %v", f.Name()) + } + rec := &walpb.Record{} decoder := newDecoder(f) for { @@ -55,18 +61,16 @@ func Repair(lg *zap.Logger, dirpath string) bool { continue case io.EOF: + if lg != nil { + lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.EOF)) + } return true case io.ErrUnexpectedEOF: - if lg != nil { - lg.Info("repairing", zap.String("path", f.Name())) - } else { - plog.Noticef("repairing %v", f.Name()) - } bf, bferr := os.Create(f.Name() + ".broken") if bferr != nil { if lg != nil { - lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken")) + lg.Warn("failed to create backup file", zap.String("path", f.Name()+".broken"), zap.Error(bferr)) } else { plog.Errorf("could not repair %v, failed to create backup file", f.Name()) } @@ -76,7 +80,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { if _, err = f.Seek(0, io.SeekStart); err != nil { if lg != nil { - lg.Warn("failed to read file", zap.String("path", f.Name())) + lg.Warn("failed to read file", zap.String("path", f.Name()), zap.Error(err)) } else { plog.Errorf("could not repair %v, failed to read file", f.Name()) } @@ -85,7 +89,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { if _, err = io.Copy(bf, f); err != nil { if lg != nil { - lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name())) + lg.Warn("failed to copy", zap.String("from", f.Name()+".broken"), zap.String("to", f.Name()), zap.Error(err)) } else { plog.Errorf("could not repair %v, failed to copy file", f.Name()) } @@ -94,25 +98,30 @@ func Repair(lg *zap.Logger, dirpath string) bool { if err = f.Truncate(lastOffset); err != nil { if lg != nil { - lg.Warn("failed to truncate", zap.String("path", f.Name())) + lg.Warn("failed to truncate", zap.String("path", f.Name()), zap.Error(err)) } else { plog.Errorf("could not repair %v, failed to truncate file", f.Name()) } return false } + if err = fileutil.Fsync(f.File); err != nil { if lg != nil { - lg.Warn("failed to fsync", zap.String("path", f.Name())) + lg.Warn("failed to fsync", zap.String("path", f.Name()), zap.Error(err)) } else { plog.Errorf("could not repair %v, failed to sync file", f.Name()) } return false } + + if lg != nil { + lg.Info("repaired", zap.String("path", f.Name()), zap.Error(io.ErrUnexpectedEOF)) + } return true default: if lg != nil { - lg.Warn("failed to repair", zap.Error(err)) + lg.Warn("failed to repair", zap.String("path", f.Name()), zap.Error(err)) } else { plog.Errorf("could not repair error (%v)", err) } @@ -123,7 +132,7 @@ func Repair(lg *zap.Logger, dirpath string) bool { // openLast opens the last wal file for read and write. func openLast(lg *zap.Logger, dirpath string) (*fileutil.LockedFile, error) { - names, err := readWalNames(lg, dirpath) + names, err := readWALNames(lg, dirpath) if err != nil { return nil, err } diff --git a/wal/repair_test.go b/wal/repair_test.go index 3deab99da..64effeb15 100644 --- a/wal/repair_test.go +++ b/wal/repair_test.go @@ -49,6 +49,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect t.Fatal(err) } defer os.RemoveAll(p) + // create WAL w, err := Create(zap.NewExample(), p, nil) defer func() { @@ -90,7 +91,7 @@ func testRepair(t *testing.T, ents [][]raftpb.Entry, corrupt corruptFunc, expect // repair the wal if ok := Repair(zap.NewExample(), p); !ok { - t.Fatalf("fix = %t, want %t", ok, true) + t.Fatalf("'Repair' returned '%v', want 'true'", ok) } // read it back @@ -184,3 +185,57 @@ func TestRepairWriteTearMiddle(t *testing.T) { } testRepair(t, ents, corruptf, 1) } + +func TestRepairFailDeleteDir(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + w, err := Create(zap.NewExample(), p, nil) + if err != nil { + t.Fatal(err) + } + + oldSegmentSizeBytes := SegmentSizeBytes + SegmentSizeBytes = 64 + defer func() { + SegmentSizeBytes = oldSegmentSizeBytes + }() + for _, es := range makeEnts(50) { + if err = w.Save(raftpb.HardState{}, es); err != nil { + t.Fatal(err) + } + } + + _, serr := w.tail().Seek(0, io.SeekCurrent) + if serr != nil { + t.Fatal(serr) + } + w.Close() + + f, err := openLast(zap.NewExample(), p) + if err != nil { + t.Fatal(err) + } + if terr := f.Truncate(20); terr != nil { + t.Fatal(err) + } + f.Close() + + w, err = Open(zap.NewExample(), p, walpb.Snapshot{}) + if err != nil { + t.Fatal(err) + } + _, _, _, err = w.ReadAll() + if err != io.ErrUnexpectedEOF { + t.Fatalf("err = %v, want error %v", err, io.ErrUnexpectedEOF) + } + w.Close() + + os.RemoveAll(p) + if Repair(zap.NewExample(), p) { + t.Fatal("expect 'Repair' fail on unexpected directory deletion") + } +} diff --git a/wal/util.go b/wal/util.go index 03cc242da..40065ec64 100644 --- a/wal/util.go +++ b/wal/util.go @@ -20,15 +20,15 @@ import ( "strings" "github.com/coreos/etcd/pkg/fileutil" + "go.uber.org/zap" ) -var ( - badWalName = errors.New("bad wal name") -) +var errBadWALName = errors.New("bad wal name") -func Exist(dirpath string) bool { - names, err := fileutil.ReadDir(dirpath) +// Exist returns true if there are any files in a given directory. +func Exist(dir string) bool { + names, err := fileutil.ReadDir(dir) if err != nil { return false } @@ -38,12 +38,16 @@ func Exist(dirpath string) bool { // searchIndex returns the last array index of names whose raft index section is // equal to or smaller than the given index. // The given names MUST be sorted. -func searchIndex(names []string, index uint64) (int, bool) { +func searchIndex(lg *zap.Logger, names []string, index uint64) (int, bool) { for i := len(names) - 1; i >= 0; i-- { name := names[i] - _, curIndex, err := parseWalName(name) + _, curIndex, err := parseWALName(name) if err != nil { - plog.Panicf("parse correct name should never fail: %v", err) + if lg != nil { + lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err)) + } else { + plog.Panicf("parse correct name should never fail: %v", err) + } } if index >= curIndex { return i, true @@ -54,12 +58,16 @@ func searchIndex(names []string, index uint64) (int, bool) { // names should have been sorted based on sequence number. // isValidSeq checks whether seq increases continuously. -func isValidSeq(names []string) bool { +func isValidSeq(lg *zap.Logger, names []string) bool { var lastSeq uint64 for _, name := range names { - curSeq, _, err := parseWalName(name) + curSeq, _, err := parseWALName(name) if err != nil { - plog.Panicf("parse correct name should never fail: %v", err) + if lg != nil { + lg.Panic("failed to parse WAL file name", zap.String("path", name), zap.Error(err)) + } else { + plog.Panicf("parse correct name should never fail: %v", err) + } } if lastSeq != 0 && lastSeq != curSeq-1 { return false @@ -69,7 +77,7 @@ func isValidSeq(names []string) bool { return true } -func readWalNames(lg *zap.Logger, dirpath string) ([]string, error) { +func readWALNames(lg *zap.Logger, dirpath string) ([]string, error) { names, err := fileutil.ReadDir(dirpath) if err != nil { return nil, err @@ -84,7 +92,7 @@ func readWalNames(lg *zap.Logger, dirpath string) ([]string, error) { func checkWalNames(lg *zap.Logger, names []string) []string { wnames := make([]string, 0) for _, name := range names { - if _, _, err := parseWalName(name); err != nil { + if _, _, err := parseWALName(name); err != nil { // don't complain about left over tmp files if !strings.HasSuffix(name, ".tmp") { if lg != nil { @@ -103,9 +111,9 @@ func checkWalNames(lg *zap.Logger, names []string) []string { return wnames } -func parseWalName(str string) (seq, index uint64, err error) { +func parseWALName(str string) (seq, index uint64, err error) { if !strings.HasSuffix(str, ".wal") { - return 0, 0, badWalName + return 0, 0, errBadWALName } _, err = fmt.Sscanf(str, "%016x-%016x.wal", &seq, &index) return seq, index, err diff --git a/wal/wal.go b/wal/wal.go index 01d1fbdae..49d450584 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -172,7 +172,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { return nil, err } - if w, err = w.renameWal(tmpdirpath); err != nil { + if w, err = w.renameWAL(tmpdirpath); err != nil { if lg != nil { lg.Warn( "failed to rename the temporary WAL directory", @@ -223,7 +223,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) { return w, nil } -func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { +func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) { if err := os.RemoveAll(w.dir); err != nil { return nil, err } @@ -235,7 +235,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { // process holds the lock. if err := os.Rename(tmpdirpath, w.dir); err != nil { if _, ok := err.(*os.LinkError); ok { - return w.renameWalUnlock(tmpdirpath) + return w.renameWALUnlock(tmpdirpath) } return nil, err } @@ -245,23 +245,24 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) { return w, err } -func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) { +func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) { // rename of directory with locked files doesn't work on windows/cifs; // close the WAL to release the locks so the directory can be renamed. if w.lg != nil { w.lg.Info( - "releasing flock to rename", + "closing WAL to release flock and retry directory renaming", zap.String("from", tmpdirpath), zap.String("to", w.dir), ) } else { plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir) } - w.Close() + if err := os.Rename(tmpdirpath, w.dir); err != nil { return nil, err } + // reopen and relock newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{}) if oerr != nil { @@ -298,13 +299,13 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err } func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { - names, err := readWalNames(lg, dirpath) + names, err := readWALNames(lg, dirpath) if err != nil { return nil, err } - nameIndex, ok := searchIndex(names, snap.Index) - if !ok || !isValidSeq(names[nameIndex:]) { + nameIndex, ok := searchIndex(lg, names, snap.Index) + if !ok || !isValidSeq(lg, names[nameIndex:]) { return nil, ErrFileNotFound } @@ -350,7 +351,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool // write reuses the file descriptors from read; don't close so // WAL can append without dropping the file lock w.readClose = nil - if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil { + if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil { closer() return nil, err } @@ -386,14 +387,17 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. ents = append(ents[:e.Index-w.start.Index-1], e) } w.enti = e.Index + case stateType: state = mustUnmarshalState(rec.Data) + case metadataType: if metadata != nil && !bytes.Equal(metadata, rec.Data) { state.Reset() return nil, state, nil, ErrMetadataConflict } metadata = rec.Data + case crcType: crc := decoder.crc.Sum32() // current crc of decoder must match the crc of the record. @@ -403,6 +407,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. return nil, state, nil, ErrCRCMismatch } decoder.updateCRC(rec.Crc) + case snapshotType: var snap walpb.Snapshot pbutil.MustUnmarshal(&snap, rec.Data) @@ -413,6 +418,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. } match = true } + default: state.Reset() return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type) @@ -483,9 +489,11 @@ func (w *WAL) cut() error { if serr != nil { return serr } + if err := w.tail().Truncate(off); err != nil { return err } + if err := w.sync(); err != nil { return err } @@ -505,15 +513,19 @@ func (w *WAL) cut() error { if err != nil { return err } + if err = w.saveCrc(prevCrc); err != nil { return err } + if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil { return err } + if err = w.saveState(&w.state); err != nil { return err } + // atomically move temp wal file to wal file if err = w.sync(); err != nil { return err @@ -550,7 +562,7 @@ func (w *WAL) cut() error { } if w.lg != nil { - + w.lg.Info("created a new WAL segment", zap.String("path", fpath)) } else { plog.Infof("segmented wal file %v is created", fpath) } @@ -578,8 +590,8 @@ func (w *WAL) sync() error { plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration) } } - syncDurations.Observe(took.Seconds()) + return err } @@ -597,9 +609,8 @@ func (w *WAL) ReleaseLockTo(index uint64) error { var smaller int found := false - for i, l := range w.locks { - _, lockIndex, err := parseWalName(filepath.Base(l.Name())) + _, lockIndex, err := parseWALName(filepath.Base(l.Name())) if err != nil { return err } @@ -631,6 +642,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error { return nil } +// Close closes the current WAL file and directory. func (w *WAL) Close() error { w.mu.Lock() defer w.mu.Unlock() @@ -651,7 +663,7 @@ func (w *WAL) Close() error { } if err := l.Close(); err != nil { if w.lg != nil { - w.lg.Error("failed to close WAL", zap.Error(err)) + w.lg.Warn("failed to close WAL", zap.Error(err)) } else { plog.Errorf("failed to unlock during closing wal: %s", err) } @@ -750,7 +762,7 @@ func (w *WAL) seq() uint64 { if t == nil { return 0 } - seq, _, err := parseWalName(filepath.Base(t.Name())) + seq, _, err := parseWALName(filepath.Base(t.Name())) if err != nil { if w.lg != nil { w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err)) diff --git a/wal/wal_test.go b/wal/wal_test.go index 00ccab68b..b2c84c6ea 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -18,6 +18,7 @@ import ( "bytes" "io" "io/ioutil" + "math" "os" "path/filepath" "reflect" @@ -85,6 +86,39 @@ func TestNew(t *testing.T) { } } +func TestCreateFailFromPollutedDir(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + ioutil.WriteFile(filepath.Join(p, "test.wal"), []byte("data"), os.ModeTemporary) + + _, err = Create(zap.NewExample(), p, []byte("data")) + if err != os.ErrExist { + t.Fatalf("expected %v, got %v", os.ErrExist, err) + } +} + +func TestCreateFailFromNoSpaceLeft(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + oldSegmentSizeBytes := SegmentSizeBytes + defer func() { + SegmentSizeBytes = oldSegmentSizeBytes + }() + SegmentSizeBytes = math.MaxInt64 + + _, err = Create(zap.NewExample(), p, []byte("data")) + if err == nil { // no space left on device + t.Fatalf("expected error 'no space left on device', got nil") + } +} + func TestNewForInitedDir(t *testing.T) { p, err := ioutil.TempDir(os.TempDir(), "waltest") if err != nil { @@ -359,7 +393,7 @@ func TestSearchIndex(t *testing.T) { }, } for i, tt := range tests { - idx, ok := searchIndex(tt.names, tt.index) + idx, ok := searchIndex(zap.NewExample(), tt.names, tt.index) if idx != tt.widx { t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx) } @@ -380,7 +414,7 @@ func TestScanWalName(t *testing.T) { {"0000000000000000-0000000000000000.snap", 0, 0, false}, } for i, tt := range tests { - s, index, err := parseWalName(tt.str) + s, index, err := parseWALName(tt.str) if g := err == nil; g != tt.wok { t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok) } @@ -583,7 +617,7 @@ func TestReleaseLockTo(t *testing.T) { } for i, l := range w.locks { var lockIndex uint64 - _, lockIndex, err = parseWalName(filepath.Base(l.Name())) + _, lockIndex, err = parseWALName(filepath.Base(l.Name())) if err != nil { t.Fatal(err) } @@ -601,7 +635,7 @@ func TestReleaseLockTo(t *testing.T) { if len(w.locks) != 1 { t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1) } - _, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name())) + _, lockIndex, err := parseWALName(filepath.Base(w.locks[0].Name())) if err != nil { t.Fatal(err) } @@ -802,3 +836,32 @@ func TestOpenOnTornWrite(t *testing.T) { t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents)) } } + +func TestRenameFail(t *testing.T) { + p, err := ioutil.TempDir(os.TempDir(), "waltest") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(p) + + oldSegmentSizeBytes := SegmentSizeBytes + defer func() { + SegmentSizeBytes = oldSegmentSizeBytes + }() + SegmentSizeBytes = math.MaxInt64 + + tp, terr := ioutil.TempDir(os.TempDir(), "waltest") + if terr != nil { + t.Fatal(terr) + } + os.RemoveAll(tp) + + w := &WAL{ + lg: zap.NewExample(), + dir: p, + } + w2, werr := w.renameWAL(tp) + if w2 != nil || werr == nil { // os.Rename should fail from 'no such file or directory' + t.Fatalf("expected error, got %v", werr) + } +}