From d015610da52fbe655937e72d42e5e4f22390e9b1 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 5 Mar 2015 13:03:04 -0800 Subject: [PATCH] etcdserver: separate apply and raft routine --- etcdserver/raft.go | 90 +++++++++++++++++++++++++++- etcdserver/server.go | 120 ++++++++++++++------------------------ etcdserver/server_test.go | 37 +----------- rafthttp/peer.go | 2 - 4 files changed, 132 insertions(+), 117 deletions(-) diff --git a/etcdserver/raft.go b/etcdserver/raft.go index a2525844f..bda306a43 100644 --- a/etcdserver/raft.go +++ b/etcdserver/raft.go @@ -20,6 +20,7 @@ import ( "log" "os" "sort" + "sync/atomic" "time" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" @@ -59,11 +60,25 @@ type RaftTimer interface { Term() uint64 } +// apply contains entries, snapshot be applied. +// After applied all the items, the application needs +// to send notification to done chan. +type apply struct { + entries []raftpb.Entry + snapshot raftpb.Snapshot + done chan struct{} +} + type raftNode struct { raft.Node - // config - snapCount uint64 // number of entries to trigger a snapshot + // a chan to send out apply + applyc chan apply + + // TODO: remove the etcdserver related logic from raftNode + // TODO: add a state machine interface to apply the commit entries + // and do snapshot/recover + s *EtcdServer // utility ticker <-chan time.Time @@ -81,6 +96,77 @@ type raftNode struct { lead uint64 } +func (r *raftNode) run() { + var syncC <-chan time.Time + + defer r.stop() + for { + select { + case <-r.ticker: + r.Tick() + case rd := <-r.Ready(): + if rd.SoftState != nil { + atomic.StoreUint64(&r.lead, rd.SoftState.Lead) + if rd.RaftState == raft.StateLeader { + syncC = r.s.SyncTicker + // TODO: remove the nil checking + // current test utility does not provide the stats + if r.s.stats != nil { + r.s.stats.BecomeLeader() + } + } else { + syncC = nil + } + } + + apply := apply{ + entries: rd.CommittedEntries, + snapshot: rd.Snapshot, + done: make(chan struct{}), + } + + select { + case r.applyc <- apply: + case <-r.s.done: + return + } + + if !raft.IsEmptySnap(rd.Snapshot) { + if err := r.storage.SaveSnap(rd.Snapshot); err != nil { + log.Fatalf("etcdraft: save snapshot error: %v", err) + } + r.raftStorage.ApplySnapshot(rd.Snapshot) + log.Printf("etcdraft: applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index) + } + if err := r.storage.Save(rd.HardState, rd.Entries); err != nil { + log.Fatalf("etcdraft: save state and entries error: %v", err) + } + r.raftStorage.Append(rd.Entries) + + r.s.send(rd.Messages) + + <-apply.done + r.Advance() + case <-syncC: + r.s.sync(defaultSyncTimeout) + case <-r.s.done: + return + } + } +} + +func (r *raftNode) apply() chan apply { + return r.applyc +} + +func (r *raftNode) stop() { + r.Stop() + r.transport.Stop() + if err := r.storage.Close(); err != nil { + log.Panicf("etcdraft: close storage error: %v", err) + } +} + // for testing func (r *raftNode) pauseSending() { p := r.transport.(rafthttp.Pausable) diff --git a/etcdserver/server.go b/etcdserver/server.go index 1ca9066a5..033aab3d6 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -110,7 +110,8 @@ type Server interface { // EtcdServer is the production implementation of the Server interface type EtcdServer struct { - cfg *ServerConfig + cfg *ServerConfig + snapCount uint64 r raftNode @@ -237,12 +238,12 @@ func NewServer(cfg *ServerConfig) (*EtcdServer, error) { lstats := stats.NewLeaderStats(id.String()) srv := &EtcdServer{ - cfg: cfg, - errorc: make(chan error, 1), - store: st, + cfg: cfg, + snapCount: cfg.SnapCount, + errorc: make(chan error, 1), + store: st, r: raftNode{ Node: n, - snapCount: cfg.SnapCount, ticker: time.Tick(time.Duration(cfg.TickMs) * time.Millisecond), raftStorage: s, storage: NewStorage(w, ss), @@ -280,9 +281,9 @@ func (s *EtcdServer) Start() { // modify a server's fields after it has been sent to Start. // This function is just used for testing. func (s *EtcdServer) start() { - if s.r.snapCount == 0 { + if s.snapCount == 0 { log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount) - s.r.snapCount = DefaultSnapCount + s.snapCount = DefaultSnapCount } s.w = wait.New() s.done = make(chan struct{}) @@ -333,73 +334,37 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { } func (s *EtcdServer) run() { - var syncC <-chan time.Time - var shouldstop bool - - // load initial state from raft storage snap, err := s.r.raftStorage.Snapshot() if err != nil { log.Panicf("etcdserver: get snapshot from raft storage error: %v", err) } - // snapi indicates the index of the last submitted snapshot request - snapi := snap.Metadata.Index - appliedi := snap.Metadata.Index confState := snap.Metadata.ConfState + snapi := snap.Metadata.Index + appliedi := snapi + // TODO: get rid of the raft initialization in etcd server + s.r.s = s + s.r.applyc = make(chan apply) + go s.r.run() + defer close(s.done) - defer func() { - s.r.Stop() - s.r.transport.Stop() - if err := s.r.storage.Close(); err != nil { - log.Panicf("etcdserver: close storage error: %v", err) - } - close(s.done) - }() - // TODO: make raft loop a method on raftNode + var shouldstop bool for { select { - case <-s.r.ticker: - s.r.Tick() - case rd := <-s.r.Ready(): - if rd.SoftState != nil { - atomic.StoreUint64(&s.r.lead, rd.SoftState.Lead) - if rd.RaftState == raft.StateLeader { - syncC = s.SyncTicker - // TODO: remove the nil checking - // current test utility does not provide the stats - if s.stats != nil { - s.stats.BecomeLeader() - } - } else { - syncC = nil + case apply := <-s.r.apply(): + // apply snapshot + if !raft.IsEmptySnap(apply.snapshot) { + if apply.snapshot.Metadata.Index <= appliedi { + log.Panicf("etcdserver: snapshot index [%d] should > appliedi[%d] + 1", + apply.snapshot.Metadata.Index, appliedi) } - } - // apply snapshot to storage if it is more updated than current snapi - if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > snapi { - if err := s.r.storage.SaveSnap(rd.Snapshot); err != nil { - log.Fatalf("etcdserver: save snapshot error: %v", err) - } - s.r.raftStorage.ApplySnapshot(rd.Snapshot) - snapi = rd.Snapshot.Metadata.Index - log.Printf("etcdserver: saved incoming snapshot at index %d", snapi) - } - - if err := s.r.storage.Save(rd.HardState, rd.Entries); err != nil { - log.Fatalf("etcdserver: save state and entries error: %v", err) - } - s.r.raftStorage.Append(rd.Entries) - - s.send(rd.Messages) - - // recover from snapshot if it is more updated than current applied - if !raft.IsEmptySnap(rd.Snapshot) && rd.Snapshot.Metadata.Index > appliedi { - if err := s.store.Recovery(rd.Snapshot.Data); err != nil { + if err := s.store.Recovery(apply.snapshot.Data); err != nil { log.Panicf("recovery store error: %v", err) } - // It avoids snapshot recovery overwriting newer cluster and + // Avoid snapshot recovery overwriting newer cluster and // transport setting, which may block the communication. - if s.Cluster.index < rd.Snapshot.Metadata.Index { + if s.Cluster.index < apply.snapshot.Metadata.Index { s.Cluster.Recover() // recover raft transport s.r.transport.RemoveAllPeers() @@ -411,38 +376,38 @@ func (s *EtcdServer) run() { } } - appliedi = rd.Snapshot.Metadata.Index - confState = rd.Snapshot.Metadata.ConfState + appliedi = apply.snapshot.Metadata.Index + snapi = appliedi + confState = apply.snapshot.Metadata.ConfState log.Printf("etcdserver: recovered from incoming snapshot at index %d", snapi) } - // TODO(bmizerany): do this in the background, but take - // care to apply entries in a single goroutine, and not - // race them. - if len(rd.CommittedEntries) != 0 { - firsti := rd.CommittedEntries[0].Index + + // apply entries + if len(apply.entries) != 0 { + firsti := apply.entries[0].Index if firsti > appliedi+1 { log.Panicf("etcdserver: first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, appliedi) } var ents []raftpb.Entry - if appliedi+1-firsti < uint64(len(rd.CommittedEntries)) { - ents = rd.CommittedEntries[appliedi+1-firsti:] + if appliedi+1-firsti < uint64(len(apply.entries)) { + ents = apply.entries[appliedi+1-firsti:] } - if len(ents) > 0 { - if appliedi, shouldstop = s.apply(ents, &confState); shouldstop { - go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) - } + if appliedi, shouldstop = s.apply(ents, &confState); shouldstop { + go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster")) } } - s.r.Advance() + // wait for the raft routine to finish the disk writes before triggering a + // snapshot. or applied index might be greater than the last index in raft + // storage, since the raft routine might be slower than apply routine. + apply.done <- struct{}{} - if appliedi-snapi > s.r.snapCount { + // trigger snapshot + if appliedi-snapi > s.snapCount { log.Printf("etcdserver: start to snapshot (applied: %d, lastsnap: %d)", appliedi, snapi) s.snapshot(appliedi, confState) snapi = appliedi } - case <-syncC: - s.sync(defaultSyncTimeout) case err := <-s.errorc: log.Printf("etcdserver: %s", err) log.Printf("etcdserver: the data-dir used by this member must be removed.") @@ -451,6 +416,7 @@ func (s *EtcdServer) run() { return } } + // TODO: wait for the stop of raft node routine? } // Stop stops the server gracefully, and shuts down the running goroutine. diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 063f05328..a7cd4a42b 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -738,9 +738,9 @@ func TestTriggerSnap(t *testing.T) { st := &storeRecorder{} p := &storageRecorder{} srv := &EtcdServer{ + snapCount: uint64(snapc), r: raftNode{ Node: newNodeCommitter(), - snapCount: uint64(snapc), raftStorage: raft.NewMemoryStorage(), storage: p, transport: &nopTransporter{}, @@ -801,41 +801,6 @@ func TestRecvSnapshot(t *testing.T) { } } -// TestRecvSlowSnapshot tests that slow snapshot will not be applied -// to store. The case could happen when server compacts the log and -// raft returns the compacted snapshot. -func TestRecvSlowSnapshot(t *testing.T) { - n := newReadyNode() - st := &storeRecorder{} - cl := newCluster("abc") - cl.SetStore(store.New()) - s := &EtcdServer{ - r: raftNode{ - Node: n, - storage: &storageRecorder{}, - raftStorage: raft.NewMemoryStorage(), - transport: &nopTransporter{}, - }, - store: st, - Cluster: cl, - } - - s.start() - n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}} - // make goroutines move forward to receive snapshot - testutil.ForceGosched() - action := st.Action() - - n.readyc <- raft.Ready{Snapshot: raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 1}}} - // make goroutines move forward to receive snapshot - testutil.ForceGosched() - s.Stop() - - if g := st.Action(); !reflect.DeepEqual(g, action) { - t.Errorf("store action = %v, want %v", g, action) - } -} - // TestApplySnapshotAndCommittedEntries tests that server applies snapshot // first and then committed entries. func TestApplySnapshotAndCommittedEntries(t *testing.T) { diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 42abc4f00..f787efe40 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -165,7 +165,6 @@ func (p *peer) Send(m raftpb.Message) { select { case p.sendc <- m: case <-p.done: - log.Panicf("peer: unexpected stopped") } } @@ -173,7 +172,6 @@ func (p *peer) Update(urls types.URLs) { select { case p.newURLsC <- urls: case <-p.done: - log.Panicf("peer: unexpected stopped") } }