etcdserver: separate apply and raft routine

release-2.1
Xiang Li 2015-03-05 13:03:04 -08:00
parent 6b9b695167
commit d015610da5
4 changed files with 132 additions and 117 deletions

View File

@ -20,6 +20,7 @@ import (
"log"
"os"
"sort"
"sync/atomic"
"time"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
@ -59,11 +60,25 @@ type RaftTimer interface {
Term() uint64
}
// apply contains entries, snapshot be applied.
// After applied all the items, the application needs
// to send notification to done chan.
type apply struct {
entries []raftpb.Entry
snapshot raftpb.Snapshot
done chan struct{}
}
type raftNode struct {
raft.Node
// config
snapCount uint64 // number of entries to trigger a snapshot
// a chan to send out apply
applyc chan apply
// TODO: remove the etcdserver related logic from raftNode
// TODO: add a state machine interface to apply the commit entries
// and do snapshot/recover
s *EtcdServer
// utility
ticker <-chan time.Time
@ -81,6 +96,77 @@ type raftNode struct {
lead uint64
}
func (r *raftNode) run() {
var syncC <-chan time.Time
defer r.stop()
for {
select {
case <-r.ticker:
r.Tick()
case rd := <-r.Ready():
if rd.SoftState != nil {
atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader {
syncC = r.s.SyncTicker
// TODO: remove the nil checking
// current test utility does not provide the stats
if r.s.stats != nil {
r.s.stats.BecomeLeader()
}
} else {
syncC = nil
}
}
apply := apply{
entries: rd.CommittedEntries,
snapshot: rd.Snapshot,
done: make(chan struct{}),
}
select {
case r.applyc <- apply:
case <-r.s.done:
return
}
if !raft.IsEmptySnap(rd.Snapshot) {
if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
log.Fatalf("etcdraft: save snapshot error: %v", err)
}
r.raftStorage.ApplySnapshot(rd.Snapshot)
log.Printf("etcdraft: applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
}
if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
log.Fatalf("etcdraft: save state and entries error: %v", err)
}
r.raftStorage.Append(rd.Entries)
r.s.send(rd.Messages)
<-apply.done
r.Advance()
case <-syncC:
r.s.sync(defaultSyncTimeout)
case <-r.s.done:
return
}
}
}
func (r *raftNode) apply() chan apply {
return r.applyc
}
func (r *raftNode) stop() {
r.Stop()
r.transport.Stop()
if err := r.storage.Close(); err != nil {
log.Panicf("etcdraft: close storage error: %v", err)
}
}
// for testing
func (r *raftNode) pauseSending() {
p := r.transport.(rafthttp.Pausable)

View File

@ -110,7 +110,8 @@ type Server interface {
// EtcdServer is the production implementation of the Server interface
type EtcdServer struct {
cfg *ServerConfig
cfg *ServerConfig
snapCount uint64
r raftNode
@ -237,12 +238,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) {
lstats := stats.NewLeaderStats(id.String())
srv := &EtcdServer{
cfg: cfg,
errorc: make(chan error, 1),
store: st,
cfg: cfg,
snapCount: cfg.SnapCount,
errorc: make(chan error, 1),
store: st,
r: raftNode{
Node: n,
snapCount: cfg.SnapCount,
ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond),
raftStorage: s,
storage: NewStorage(w, ss),
@ -280,9 +281,9 @@ func (s *EtcdServer) Start() {
// modify a server's fields after it has been sent to Start.
// This function is just used for testing.
func (s *EtcdServer) start() {
if s.r.snapCount == 0 {
if s.snapCount == 0 {
log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
s.r.snapCount = DefaultSnapCount
s.snapCount = DefaultSnapCount
}
s.w = wait.New()
s.done = make(chan struct{})
@ -333,73 +334,37 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
}
func (s *EtcdServer) run() {
var syncC <-chan time.Time
var shouldstop bool
// load initial state from raft storage
snap, err := s.r.raftStorage.Snapshot()
if err != nil {
log.Panicf("etcdserver: get snapshot from raft storage error: %v", err)
}
// snapi indicates the index of the last submitted snapshot request
snapi := snap.Metadata.Index
appliedi := snap.Metadata.Index
confState := snap.Metadata.ConfState
snapi := snap.Metadata.Index
appliedi := snapi
// TODO: get rid of the raft initialization in etcd server
s.r.s = s
s.r.applyc = make(chan apply)
go s.r.run()
defer close(s.done)
defer func() {
s.r.Stop()
s.r.transport.Stop()
if err := s.r.storage.Close(); err != nil {
log.Panicf("etcdserver: close storage error: %v", err)
}
close(s.done)
}()
// TODO: make raft loop a method on raftNode
var shouldstop bool
for {
select {
case <-s.r.ticker:
s.r.Tick()
case rd := <-s.r.Ready():
if rd.SoftState != nil {
atomic.StoreUint64(&s.r.lead, rd.SoftState.Lead)
if rd.RaftState == raft.StateLeader {
syncC = s.SyncTicker
// TODO: remove the nil checking
// current test utility does not provide the stats
if s.stats != nil {
s.stats.BecomeLeader()
}
} else {
syncC = nil
case apply := <-s.r.apply():
// apply snapshot
if !raft.IsEmptySnap(apply.snapshot) {
if apply.snapshot.Metadata.Index <= appliedi {
log.Panicf("etcdserver: snapshot index [%d] should > appliedi[%d] + 1",
apply.snapshot.Metadata.Index, appliedi)
}
}
// apply snapshot to storage if it is more updated than current snapi
if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi {
if err := s.r.storage.SaveSnap(rd.Snapshot); err != nil {
log.Fatalf("etcdserver: save snapshot error: %v", err)
}
s.r.raftStorage.ApplySnapshot(rd.Snapshot)
snapi = rd.Snapshot.Metadata.Index
log.Printf("etcdserver: saved incoming snapshot at index %d", snapi)
}
if err := s.r.storage.Save(rd.HardState, rd.Entries); err != nil {
log.Fatalf("etcdserver: save state and entries error: %v", err)
}
s.r.raftStorage.Append(rd.Entries)
s.send(rd.Messages)
// recover from snapshot if it is more updated than current applied
if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi {
if err := s.store.Recovery(rd.Snapshot.Data); err != nil {
if err := s.store.Recovery(apply.snapshot.Data); err != nil {
log.Panicf("recovery store error: %v", err)
}
// It avoids snapshot recovery overwriting newer cluster and
// Avoid snapshot recovery overwriting newer cluster and
// transport setting, which may block the communication.
if s.Cluster.index < rd.Snapshot.Metadata.Index {
if s.Cluster.index < apply.snapshot.Metadata.Index {
s.Cluster.Recover()
// recover raft transport
s.r.transport.RemoveAllPeers()
@ -411,38 +376,38 @@ func (s *EtcdServer) run() {
}
}
appliedi = rd.Snapshot.Metadata.Index
confState = rd.Snapshot.Metadata.ConfState
appliedi = apply.snapshot.Metadata.Index
snapi = appliedi
confState = apply.snapshot.Metadata.ConfState
log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi)
}
// TODO(bmizerany): do this in the background, but take
// care to apply entries in a single goroutine, and not
// race them.
if len(rd.CommittedEntries) != 0 {
firsti := rd.CommittedEntries[0].Index
// apply entries
if len(apply.entries) != 0 {
firsti := apply.entries[0].Index
if firsti > appliedi+1 {
log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi)
}
var ents []raftpb.Entry
if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) {
ents = rd.CommittedEntries[appliedi+1-firsti:]
if appliedi+1-firsti < uint64(len(apply.entries)) {
ents = apply.entries[appliedi+1-firsti:]
}
if len(ents) > 0 {
if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
if appliedi, shouldstop = s.apply(ents, &confState); shouldstop {
go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
}
}
s.r.Advance()
// wait for the raft routine to finish the disk writes before triggering a
// snapshot. or applied index might be greater than the last index in raft
// storage, since the raft routine might be slower than apply routine.
apply.done <- struct{}{}
if appliedi-snapi > s.r.snapCount {
// trigger snapshot
if appliedi-snapi > s.snapCount {
log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi)
s.snapshot(appliedi, confState)
snapi = appliedi
}
case <-syncC:
s.sync(defaultSyncTimeout)
case err := <-s.errorc:
log.Printf("etcdserver: %s", err)
log.Printf("etcdserver: the data-dir used by this member must be removed.")
@ -451,6 +416,7 @@ func (s *EtcdServer) run() {
return
}
}
// TODO: wait for the stop of raft node routine?
}
// Stop stops the server gracefully, and shuts down the running goroutine.

View File

@ -738,9 +738,9 @@ func TestTriggerSnap(t *testing.T) {
st := &storeRecorder{}
p := &storageRecorder{}
srv := &EtcdServer{
snapCount: uint64(snapc),
r: raftNode{
Node: newNodeCommitter(),
snapCount: uint64(snapc),
raftStorage: raft.NewMemoryStorage(),
storage: p,
transport: &nopTransporter{},
@ -801,41 +801,6 @@ func TestRecvSnapshot(t *testing.T) {
}
}
// TestRecvSlowSnapshot tests that slow snapshot will not be applied
// to store. The case could happen when server compacts the log and
// raft returns the compacted snapshot.
func TestRecvSlowSnapshot(t *testing.T) {
n := newReadyNode()
st := &storeRecorder{}
cl := newCluster("abc")
cl.SetStore(store.New())
s := &EtcdServer{
r: raftNode{
Node: n,
storage: &storageRecorder{},
raftStorage: raft.NewMemoryStorage(),
transport: &nopTransporter{},
},
store: st,
Cluster: cl,
}
s.start()
n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
// make goroutines move forward to receive snapshot
testutil.ForceGosched()
action := st.Action()
n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}}
// make goroutines move forward to receive snapshot
testutil.ForceGosched()
s.Stop()
if g := st.Action(); !reflect.DeepEqual(g, action) {
t.Errorf("store action = %v, want %v", g, action)
}
}
// TestApplySnapshotAndCommittedEntries tests that server applies snapshot
// first and then committed entries.
func TestApplySnapshotAndCommittedEntries(t *testing.T) {

View File

@ -165,7 +165,6 @@ func (p *peer) Send(m raftpb.Message) {
select {
case p.sendc <- m:
case <-p.done:
log.Panicf("peer: unexpected stopped")
}
}
@ -173,7 +172,6 @@ func (p *peer) Update(urls types.URLs) {
select {
case p.newURLsC <- urls:
case <-p.done:
log.Panicf("peer: unexpected stopped")
}
}