diff --git a/wal/decoder.go b/wal/decoder.go index 8e16eb1b3..a836ccd68 100644 --- a/wal/decoder.go +++ b/wal/decoder.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "hash" "io" + "sync" "github.com/coreos/etcd/pkg/crc" "github.com/coreos/etcd/pkg/pbutil" @@ -27,7 +28,9 @@ import ( ) type decoder struct { - br *bufio.Reader + mu sync.Mutex + br *bufio.Reader + c io.Closer crc hash.Hash32 } @@ -41,6 +44,9 @@ func newDecoder(rc io.ReadCloser) *decoder { } func (d *decoder) decode(rec *walpb.Record) error { + d.mu.Lock() + defer d.mu.Unlock() + rec.Reset() l, err := readInt64(d.br) if err != nil { diff --git a/wal/encoder.go b/wal/encoder.go index 3baa766cf..ca0df44a0 100644 --- a/wal/encoder.go +++ b/wal/encoder.go @@ -19,13 +19,16 @@ import ( "encoding/binary" "hash" "io" + "sync" "github.com/coreos/etcd/pkg/crc" "github.com/coreos/etcd/wal/walpb" ) type encoder struct { - bw *bufio.Writer + mu sync.Mutex + bw *bufio.Writer + crc hash.Hash32 } @@ -37,6 +40,9 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder { } func (e *encoder) encode(rec *walpb.Record) error { + e.mu.Lock() + defer e.mu.Unlock() + e.crc.Write(rec.Data) rec.Crc = e.crc.Sum32() data, err := rec.Marshal() @@ -51,6 +57,8 @@ func (e *encoder) encode(rec *walpb.Record) error { } func (e *encoder) flush() error { + e.mu.Lock() + defer e.mu.Unlock() return e.bw.Flush() }