commit
5da481213e
|
@ -88,9 +88,9 @@ type Response struct {
|
|||
type Storage interface {
|
||||
// Save function saves ents and state to the underlying stable storage.
|
||||
// Save MUST block until st and ents are on stable storage.
|
||||
Save(st raftpb.HardState, ents []raftpb.Entry)
|
||||
Save(st raftpb.HardState, ents []raftpb.Entry) error
|
||||
// SaveSnap function saves snapshot to the underlying stable storage.
|
||||
SaveSnap(snap raftpb.Snapshot)
|
||||
SaveSnap(snap raftpb.Snapshot) error
|
||||
|
||||
// TODO: WAL should be able to control cut itself. After implement self-controled cut,
|
||||
// remove it in this interface.
|
||||
|
@ -314,8 +314,12 @@ func (s *EtcdServer) run() {
|
|||
}
|
||||
}
|
||||
|
||||
s.storage.Save(rd.HardState, rd.Entries)
|
||||
s.storage.SaveSnap(rd.Snapshot)
|
||||
if err := s.storage.Save(rd.HardState, rd.Entries); err != nil {
|
||||
log.Panicf("etcdserver: save state and entries error: %v", err)
|
||||
}
|
||||
if err := s.storage.SaveSnap(rd.Snapshot); err != nil {
|
||||
log.Panicf("etcdserver: create snapshot error: %v", err)
|
||||
}
|
||||
s.send(rd.Messages)
|
||||
|
||||
// TODO(bmizerany): do this in the background, but take
|
||||
|
@ -671,7 +675,9 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
|||
panic("TODO: this is bad, what do we do about it?")
|
||||
}
|
||||
s.node.Compact(snapi, snapnodes, d)
|
||||
s.storage.Cut()
|
||||
if err := s.storage.Cut(); err != nil {
|
||||
log.Panicf("etcdserver: rotate wal file error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func GetClusterFromPeers(urls []string) (*Cluster, error) {
|
||||
|
|
|
@ -1222,18 +1222,19 @@ type storageRecorder struct {
|
|||
recorder
|
||||
}
|
||||
|
||||
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) {
|
||||
func (p *storageRecorder) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
||||
p.record(action{name: "Save"})
|
||||
return nil
|
||||
}
|
||||
func (p *storageRecorder) Cut() error {
|
||||
p.record(action{name: "Cut"})
|
||||
return nil
|
||||
}
|
||||
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) {
|
||||
if raft.IsEmptySnap(st) {
|
||||
return
|
||||
func (p *storageRecorder) SaveSnap(st raftpb.Snapshot) error {
|
||||
if !raft.IsEmptySnap(st) {
|
||||
p.record(action{name: "SaveSnap"})
|
||||
}
|
||||
p.record(action{name: "SaveSnap"})
|
||||
return nil
|
||||
}
|
||||
|
||||
type readyNode struct {
|
||||
|
|
|
@ -52,11 +52,11 @@ func New(dir string) *Snapshotter {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) {
|
||||
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error {
|
||||
if raft.IsEmptySnap(snapshot) {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
s.save(&snapshot)
|
||||
return s.save(&snapshot)
|
||||
}
|
||||
|
||||
func (s *Snapshotter) save(snapshot *raftpb.Snapshot) error {
|
||||
|
|
14
wal/wal.go
14
wal/wal.go
|
@ -273,13 +273,17 @@ func (w *WAL) SaveState(s *raftpb.HardState) error {
|
|||
return w.encoder.encode(rec)
|
||||
}
|
||||
|
||||
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) {
|
||||
func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
||||
// TODO(xiangli): no more reference operator
|
||||
w.SaveState(&st)
|
||||
for i := range ents {
|
||||
w.SaveEntry(&ents[i])
|
||||
if err := w.SaveState(&st); err != nil {
|
||||
return err
|
||||
}
|
||||
w.Sync()
|
||||
for i := range ents {
|
||||
if err := w.SaveEntry(&ents[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return w.Sync()
|
||||
}
|
||||
|
||||
func (w *WAL) saveCrc(prevCrc uint32) error {
|
||||
|
|
Loading…
Reference in New Issue