wal: do not race reader and writer

release-2.1
Xiang Li 2015-03-05 20:19:17 -08:00
parent 6b9b695167
commit ab72c3ec88
2 changed files with 16 additions and 2 deletions

View File

@ -19,6 +19,7 @@ import (
"encoding/binary"
"hash"
"io"
"sync"
"github.com/coreos/etcd/pkg/crc"
"github.com/coreos/etcd/pkg/pbutil"
@ -27,7 +28,9 @@ import (
)
type decoder struct {
br *bufio.Reader
mu sync.Mutex
br *bufio.Reader
c io.Closer
crc hash.Hash32
}
@ -41,6 +44,9 @@ func newDecoder(rc io.ReadCloser) *decoder {
}
func (d *decoder) decode(rec *walpb.Record) error {
d.mu.Lock()
defer d.mu.Unlock()
rec.Reset()
l, err := readInt64(d.br)
if err != nil {

View File

@ -19,13 +19,16 @@ import (
"encoding/binary"
"hash"
"io"
"sync"
"github.com/coreos/etcd/pkg/crc"
"github.com/coreos/etcd/wal/walpb"
)
type encoder struct {
bw *bufio.Writer
mu sync.Mutex
bw *bufio.Writer
crc hash.Hash32
}
@ -37,6 +40,9 @@ func newEncoder(w io.Writer, prevCrc uint32) *encoder {
}
func (e *encoder) encode(rec *walpb.Record) error {
e.mu.Lock()
defer e.mu.Unlock()
e.crc.Write(rec.Data)
rec.Crc = e.crc.Sum32()
data, err := rec.Marshal()
@ -51,6 +57,8 @@ func (e *encoder) encode(rec *walpb.Record) error {
}
func (e *encoder) flush() error {
e.mu.Lock()
defer e.mu.Unlock()
return e.bw.Flush()
}