raft: do not panic on out of date compaction
parent
01cbcce8ba
commit
9df0e7715d
|
@ -808,7 +808,16 @@ func (s *EtcdServer) snapshot(snapi uint64, snapnodes []uint64) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panicf("store save should never fail: %v", err)
|
log.Panicf("store save should never fail: %v", err)
|
||||||
}
|
}
|
||||||
s.raftStorage.Compact(snapi, &raftpb.ConfState{Nodes: snapnodes}, d)
|
err = s.raftStorage.Compact(snapi, &raftpb.ConfState{Nodes: snapnodes}, d)
|
||||||
|
if err != nil {
|
||||||
|
// the snapshot was done asynchronously with the progress of raft.
|
||||||
|
// raft might have already got a newer snapshot and called compact.
|
||||||
|
if err == raft.ErrCompacted {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Panicf("etcdserver: unexpected compaction error %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := s.storage.Cut(); err != nil {
|
if err := s.storage.Cut(); err != nil {
|
||||||
log.Panicf("rotate wal file should never fail: %v", err)
|
log.Panicf("rotate wal file should never fail: %v", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -488,7 +488,13 @@ func TestCompaction(t *testing.T) {
|
||||||
raftLog.appliedTo(raftLog.committed)
|
raftLog.appliedTo(raftLog.committed)
|
||||||
|
|
||||||
for j := 0; j < len(tt.compact); j++ {
|
for j := 0; j < len(tt.compact); j++ {
|
||||||
storage.Compact(tt.compact[j], nil, nil)
|
err := storage.Compact(tt.compact[j], nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
if tt.wallow {
|
||||||
|
t.Errorf("#%d.%d allow = %t, want %t", i, j, false, tt.wallow)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
if len(raftLog.allEntries()) != tt.wleft[j] {
|
if len(raftLog.allEntries()) != tt.wleft[j] {
|
||||||
t.Errorf("#%d.%d len = %d, want %d", i, j, len(raftLog.allEntries()), tt.wleft[j])
|
t.Errorf("#%d.%d len = %d, want %d", i, j, len(raftLog.allEntries()), tt.wleft[j])
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,13 +18,13 @@ package raft
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
pb "github.com/coreos/etcd/raft/raftpb"
|
pb "github.com/coreos/etcd/raft/raftpb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ErrCompacted is returned by Storage.Entries when a requested
|
// ErrCompacted is returned by Storage.Entries/Compact when a requested
|
||||||
// index is unavailable because it predates the last snapshot.
|
// index is unavailable because it predates the last snapshot.
|
||||||
var ErrCompacted = errors.New("requested index is unavailable due to compaction")
|
var ErrCompacted = errors.New("requested index is unavailable due to compaction")
|
||||||
|
|
||||||
|
@ -154,9 +154,11 @@ func (ms *MemoryStorage) Compact(i uint64, cs *pb.ConfState, data []byte) error
|
||||||
ms.Lock()
|
ms.Lock()
|
||||||
defer ms.Unlock()
|
defer ms.Unlock()
|
||||||
offset := ms.snapshot.Metadata.Index
|
offset := ms.snapshot.Metadata.Index
|
||||||
if i <= offset || i > offset+uint64(len(ms.ents))-1 {
|
if i <= offset {
|
||||||
panic(fmt.Sprintf("compact %d out of bounds (%d, %d)", i, offset,
|
return ErrCompacted
|
||||||
offset+uint64(len(ms.ents))-1))
|
}
|
||||||
|
if i > offset+uint64(len(ms.ents))-1 {
|
||||||
|
log.Panicf("compact %d out of bound lastindex(%d)", i, offset+uint64(len(ms.ents))-1)
|
||||||
}
|
}
|
||||||
i -= offset
|
i -= offset
|
||||||
ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
|
ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
|
||||||
|
|
Loading…
Reference in New Issue