wal: refactor
parent
77fbd2610c
commit
69f2d5c590
|
@ -48,6 +48,10 @@ At a later time a WAL can be opened at a particular raft index:
|
||||||
w, err := wal.OpenAtIndex("/var/lib/etcd", 0)
|
w, err := wal.OpenAtIndex("/var/lib/etcd", 0)
|
||||||
...
|
...
|
||||||
|
|
||||||
|
The raft index must have been written to the WAL. When opening without a
|
||||||
|
snapshot the raft index should always be 0. When opening with a snapshot
|
||||||
|
the raft index should be the index of the last entry covered by the snapshot.
|
||||||
|
|
||||||
Additional items cannot be Saved to this WAL until all of the items from 0 to
|
Additional items cannot be Saved to this WAL until all of the items from 0 to
|
||||||
the end of the WAL are read first:
|
the end of the WAL are read first:
|
||||||
|
|
||||||
|
|
11
wal/wal.go
11
wal/wal.go
|
@ -60,8 +60,8 @@ type WAL struct {
|
||||||
decoder *decoder // decoder to decode records
|
decoder *decoder // decoder to decode records
|
||||||
|
|
||||||
f *os.File // underlay file opened for appending, sync
|
f *os.File // underlay file opened for appending, sync
|
||||||
seq int64 // the sequence of the current writting wal file
|
seq int64 // sequence of the wal file currently used for writes
|
||||||
enti int64 // index of the last entry that has been saved to wal
|
enti int64 // index of the last entry saved to the wal
|
||||||
encoder *encoder // encoder to encode records
|
encoder *encoder // encoder to encode records
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ func Create(dirpath string) (*WAL, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// OpenAtIndex opens the WAL at the given index.
|
// OpenAtIndex opens the WAL at the given index.
|
||||||
// There MUST be an entry with given index written to WAL before.
|
// The index MUST have been previously committed to the WAL.
|
||||||
// The returned WAL is ready to read and the first record will be the given
|
// The returned WAL is ready to read and the first record will be the given
|
||||||
// index. The WAL cannot be appended to before reading out all of its
|
// index. The WAL cannot be appended to before reading out all of its
|
||||||
// previous records.
|
// previous records.
|
||||||
|
@ -205,18 +205,17 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err
|
||||||
|
|
||||||
// Cut closes current file written and creates a new one ready to append.
|
// Cut closes current file written and creates a new one ready to append.
|
||||||
func (w *WAL) Cut() error {
|
func (w *WAL) Cut() error {
|
||||||
log.Printf("wal.cut index=%d", w.enti+1)
|
|
||||||
|
|
||||||
// create a new wal file with name sequence + 1
|
// create a new wal file with name sequence + 1
|
||||||
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
||||||
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Sync()
|
w.Sync()
|
||||||
w.f.Close()
|
w.f.Close()
|
||||||
|
|
||||||
|
log.Printf("wal.cut index=%d prevfile=%s curfile=%s", w.enti, w.f.Name(), f.Name())
|
||||||
|
|
||||||
// update writer and save the previous crc
|
// update writer and save the previous crc
|
||||||
w.f = f
|
w.f = f
|
||||||
w.seq++
|
w.seq++
|
||||||
|
|
Loading…
Reference in New Issue