diff --git a/wal/wal.go b/wal/wal.go index 1bc2f2c4e..e63c7d752 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -23,6 +23,7 @@ import ( "os" "path" "reflect" + "sync" "time" "github.com/coreos/etcd/pkg/fileutil" @@ -69,6 +70,7 @@ type WAL struct { start walpb.Snapshot // snapshot to start reading decoder *decoder // decoder to decode records + mu sync.Mutex f *os.File // underlay file opened for appending, sync seq uint64 // sequence of the wal file currently used for writes enti uint64 // index of the last entry saved to the wal @@ -213,6 +215,9 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, all bool) (*WAL, error) { // TODO: maybe loose the checking of match. // After ReadAll, the WAL will be ready for appending new records. func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.Entry, err error) { + w.mu.Lock() + defer w.mu.Unlock() + rec := &walpb.Record{} decoder := w.decoder @@ -335,6 +340,9 @@ func (w *WAL) sync() error { // For example, if WAL is holding lock 1,2,3,4,5,6, ReleaseLockTo(4) will release // lock 1,2 but keep 3. ReleaseLockTo(5) will release 1,2,3 but keep 4. func (w *WAL) ReleaseLockTo(index uint64) error { + w.mu.Lock() + defer w.mu.Unlock() + var smaller int found := false @@ -370,6 +378,9 @@ func (w *WAL) ReleaseLockTo(index uint64) error { } func (w *WAL) Close() error { + w.mu.Lock() + defer w.mu.Unlock() + if w.f != nil { if err := w.sync(); err != nil { return err @@ -409,6 +420,9 @@ func (w *WAL) saveState(s *raftpb.HardState) error { } func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { + w.mu.Lock() + defer w.mu.Unlock() + // short cut, do not call sync if raft.IsEmptyHardState(st) && len(ents) == 0 { return nil @@ -436,6 +450,9 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error { } func (w *WAL) SaveSnapshot(e walpb.Snapshot) error { + w.mu.Lock() + defer w.mu.Unlock() + b := pbutil.MustMarshal(&e) rec := &walpb.Record{Type: snapshotType, Data: b} if err := w.encoder.encode(rec); err != nil {