diff --git a/mvcc/backend/backend_test.go b/mvcc/backend/backend_test.go index 664579bba..9bdec5c48 100644 --- a/mvcc/backend/backend_test.go +++ b/mvcc/backend/backend_test.go @@ -250,6 +250,56 @@ func TestBackendWriteback(t *testing.T) { } } +// TestBackendWritebackForEach checks that partially written / buffered +// data is visited in the same order as fully committed data. +func TestBackendWritebackForEach(t *testing.T) { + b, tmpPath := NewTmpBackend(time.Hour, 10000) + defer cleanup(b, tmpPath) + + tx := b.BatchTx() + tx.Lock() + tx.UnsafeCreateBucket([]byte("key")) + for i := 0; i < 5; i++ { + k := []byte(fmt.Sprintf("%04d", i)) + tx.UnsafePut([]byte("key"), k, []byte("bar")) + } + tx.Unlock() + + // writeback + b.ForceCommit() + + tx.Lock() + tx.UnsafeCreateBucket([]byte("key")) + for i := 5; i < 20; i++ { + k := []byte(fmt.Sprintf("%04d", i)) + tx.UnsafePut([]byte("key"), k, []byte("bar")) + } + tx.Unlock() + + seq := "" + getSeq := func(k, v []byte) error { + seq += string(k) + return nil + } + rtx := b.ReadTx() + rtx.Lock() + rtx.UnsafeForEach([]byte("key"), getSeq) + rtx.Unlock() + + partialSeq := seq + + seq = "" + b.ForceCommit() + + tx.Lock() + tx.UnsafeForEach([]byte("key"), getSeq) + tx.Unlock() + + if seq != partialSeq { + t.Fatalf("expected %q, got %q", seq, partialSeq) + } +} + func cleanup(b Backend, path string) { b.Close() os.Remove(path) diff --git a/mvcc/backend/read_tx.go b/mvcc/backend/read_tx.go index a5ceeaeb7..9101cfd2a 100644 --- a/mvcc/backend/read_tx.go +++ b/mvcc/backend/read_tx.go @@ -70,21 +70,24 @@ func (rt *readTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][] func (rt *readTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error { dups := make(map[string]struct{}) - f1 := func(k, v []byte) error { + getDups := func(k, v []byte) error { dups[string(k)] = struct{}{} - return visitor(k, v) + return nil } - f2 := func(k, v []byte) error { + visitNoDup := func(k, v []byte) error { if _, ok := dups[string(k)]; ok { return nil } return visitor(k, v) } - if err := rt.buf.ForEach(bucketName, f1); err != nil { + if err := rt.buf.ForEach(bucketName, getDups); err != nil { return err } rt.txmu.Lock() - err := unsafeForEach(rt.tx, bucketName, f2) + err := unsafeForEach(rt.tx, bucketName, visitNoDup) rt.txmu.Unlock() - return err + if err != nil { + return err + } + return rt.buf.ForEach(bucketName, visitor) }