diff --git a/contrib/raftexample/httpapi.go b/contrib/raftexample/httpapi.go index 044066a33..10d3a5d9b 100644 --- a/contrib/raftexample/httpapi.go +++ b/contrib/raftexample/httpapi.go @@ -18,7 +18,6 @@ import ( "io/ioutil" "log" "net/http" - "os" "strconv" "github.com/coreos/etcd/raft/raftpb" @@ -102,25 +101,22 @@ func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // serveHttpKVAPI starts a key-value server with a GET/PUT API and listens. -func serveHttpKVAPI(port int, proposeC chan<- string, confChangeC chan<- raftpb.ConfChange, - commitC <-chan *string, errorC <-chan error) { - - // exit when raft goes down - go func() { - if err, ok := <-errorC; ok { - log.Fatal(err) - } - os.Exit(0) - }() - +func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) { srv := http.Server{ Addr: ":" + strconv.Itoa(port), Handler: &httpKVAPI{ - store: newKVStore(proposeC, commitC, errorC), + store: kv, confChangeC: confChangeC, }, } - if err := srv.ListenAndServe(); err != nil { + go func() { + if err := srv.ListenAndServe(); err != nil { + log.Fatal(err) + } + }() + + // exit when raft goes down + if err, ok := <-errorC; ok { log.Fatal(err) } } diff --git a/contrib/raftexample/kvstore.go b/contrib/raftexample/kvstore.go index 9765d4232..5651cf0f5 100644 --- a/contrib/raftexample/kvstore.go +++ b/contrib/raftexample/kvstore.go @@ -17,15 +17,19 @@ package main import ( "bytes" "encoding/gob" + "encoding/json" "log" "sync" + + "github.com/coreos/etcd/snap" ) // a key-value store backed by raft type kvstore struct { - proposeC chan<- string // channel for proposing updates - mu sync.RWMutex - kvStore map[string]string // current committed key-value pairs + proposeC chan<- string // channel for proposing updates + mu sync.RWMutex + kvStore map[string]string // current committed key-value pairs + snapshotter *snap.Snapshotter } type kv struct { @@ -33,8 +37,8 @@ type kv struct { Val string } -func newKVStore(proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { - s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string)} +func newKVStore(snapshotter *snap.Snapshotter, proposeC chan<- string, commitC <-chan *string, errorC <-chan error) *kvstore { + s := &kvstore{proposeC: proposeC, kvStore: make(map[string]string), snapshotter: snapshotter} // replay log into key-value map s.readCommits(commitC, errorC) // read commits from raft into kvStore map until error @@ -61,7 +65,18 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { for data := range commitC { if data == nil { // done replaying log; new data incoming - return + // OR signaled to load snapshot + snapshot, err := s.snapshotter.Load() + if err == snap.ErrNoSnapshot { + return + } + if err != nil && err != snap.ErrNoSnapshot { + log.Panic(err) + } + log.Printf("loading snapshot at term %d and index %d", snapshot.Metadata.Term, snapshot.Metadata.Index) + if err := s.recoverFromSnapshot(snapshot.Data); err != nil { + log.Panic(err) + } } var dataKv kv @@ -77,3 +92,20 @@ func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) { log.Fatal(err) } } + +func (s *kvstore) getSnapshot() ([]byte, error) { + s.mu.Lock() + defer s.mu.Unlock() + return json.Marshal(s.kvStore) +} + +func (s *kvstore) recoverFromSnapshot(snapshot []byte) error { + var store map[string]string + if err := json.Unmarshal(snapshot, &store); err != nil { + return err + } + s.mu.Lock() + s.kvStore = store + s.mu.Unlock() + return nil +} diff --git a/contrib/raftexample/kvstore_test.go b/contrib/raftexample/kvstore_test.go new file mode 100644 index 000000000..231f778f2 --- /dev/null +++ b/contrib/raftexample/kvstore_test.go @@ -0,0 +1,47 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "reflect" + "testing" +) + +func Test_kvstore_snapshot(t *testing.T) { + tm := map[string]string{"foo": "bar"} + s := &kvstore{kvStore: tm} + + v, _ := s.Lookup("foo") + if v != "bar" { + t.Fatalf("foo has unexpected value, got %s", v) + } + + data, err := s.getSnapshot() + if err != nil { + t.Fatal(err) + } + s.kvStore = nil + + if err := s.recoverFromSnapshot(data); err != nil { + t.Fatal(err) + } + v, _ = s.Lookup("foo") + if v != "bar" { + t.Fatalf("foo has unexpected value, got %s", v) + } + if !reflect.DeepEqual(s.kvStore, tm) { + t.Fatalf("store expected %+v, got %+v", tm, s.kvStore) + } +} diff --git a/contrib/raftexample/main.go b/contrib/raftexample/main.go index 8c3943a5b..582692469 100644 --- a/contrib/raftexample/main.go +++ b/contrib/raftexample/main.go @@ -34,8 +34,12 @@ func main() { defer close(confChangeC) // raft provides a commit stream for the proposals from the http api - commitC, errorC := newRaftNode(*id, strings.Split(*cluster, ","), *join, proposeC, confChangeC) + var kvs *kvstore + getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() } + commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC) + + kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC) // the key-value http handler will propose updates to raft - serveHttpKVAPI(*kvport, proposeC, confChangeC, commitC, errorC) + serveHttpKVAPI(kvs, *kvport, confChangeC, errorC) } diff --git a/contrib/raftexample/raft.go b/contrib/raftexample/raft.go index f9f82b08d..b1defe2c3 100644 --- a/contrib/raftexample/raft.go +++ b/contrib/raftexample/raft.go @@ -25,10 +25,12 @@ import ( "net/url" "github.com/coreos/etcd/etcdserver/stats" + "github.com/coreos/etcd/pkg/fileutil" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/raft" "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/rafthttp" + "github.com/coreos/etcd/snap" "github.com/coreos/etcd/wal" "github.com/coreos/etcd/wal/walpb" "golang.org/x/net/context" @@ -41,29 +43,42 @@ type raftNode struct { commitC chan<- *string // entries committed to log (k,v) errorC chan<- error // errors from raft session - id int // client ID for raft session - peers []string // raft peer URLs - join bool // node is joining an existing cluster - waldir string // path to WAL directory - lastIndex uint64 // index of log at start + id int // client ID for raft session + peers []string // raft peer URLs + join bool // node is joining an existing cluster + waldir string // path to WAL directory + snapdir string // path to snapshot directory + getSnapshot func() ([]byte, error) + lastIndex uint64 // index of log at start + + confState raftpb.ConfState + snapshotIndex uint64 + appliedIndex uint64 // raft backing for the commit/error channel node raft.Node raftStorage *raft.MemoryStorage wal *wal.WAL - transport *rafthttp.Transport - stopc chan struct{} // signals proposal channel closed - httpstopc chan struct{} // signals http server to shutdown - httpdonec chan struct{} // signals http server shutdown complete + + snapshotter *snap.Snapshotter + snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready + + snapCount uint64 + transport *rafthttp.Transport + stopc chan struct{} // signals proposal channel closed + httpstopc chan struct{} // signals http server to shutdown + httpdonec chan struct{} // signals http server shutdown complete } +var defaultSnapCount uint64 = 10000 + // newRaftNode initiates a raft instance and returns a committed log entry // channel and error channel. Proposals for log updates are sent over the // provided the proposal channel. All log entries are replayed over the // commit channel, followed by a nil message (to indicate the channel is // current), then new log entries. To shutdown, close proposeC and read errorC. -func newRaftNode(id int, peers []string, join bool, proposeC <-chan string, - confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error) { +func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string, + confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) { commitC := make(chan *string) errorC := make(chan error) @@ -77,14 +92,47 @@ func newRaftNode(id int, peers []string, join bool, proposeC <-chan string, peers: peers, join: join, waldir: fmt.Sprintf("raftexample-%d", id), + snapdir: fmt.Sprintf("raftexample-%d-snap", id), + getSnapshot: getSnapshot, raftStorage: raft.NewMemoryStorage(), + snapCount: defaultSnapCount, stopc: make(chan struct{}), httpstopc: make(chan struct{}), httpdonec: make(chan struct{}), + + snapshotterReady: make(chan *snap.Snapshotter, 1), // rest of structure populated after WAL replay } go rc.startRaft() - return commitC, errorC + return commitC, errorC, rc.snapshotterReady +} + +func (rc *raftNode) saveSnap(snap raftpb.Snapshot) error { + if err := rc.snapshotter.SaveSnap(snap); err != nil { + return err + } + walSnap := walpb.Snapshot{ + Index: snap.Metadata.Index, + Term: snap.Metadata.Term, + } + if err := rc.wal.SaveSnapshot(walSnap); err != nil { + return err + } + return rc.wal.ReleaseLockTo(snap.Metadata.Index) +} + +func (rc *raftNode) entriesToApply(ents []raftpb.Entry) (nents []raftpb.Entry) { + if len(ents) == 0 { + return + } + firstIdx := ents[0].Index + if firstIdx > rc.appliedIndex+1 { + log.Fatalf("first index of committed entry[%d] should <= progress.appliedIndex[%d] 1", firstIdx, rc.appliedIndex) + } + if rc.appliedIndex-firstIdx+1 < uint64(len(ents)) { + nents = ents[rc.appliedIndex-firstIdx+1:] + } + return } // publishEntries writes committed log entries to commit channel and returns @@ -122,6 +170,9 @@ func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool { } } + // after commit, update appliedIndex + rc.appliedIndex = ents[i].Index + // special nil commit to signal replay has finished if ents[i].Index == rc.lastIndex { select { @@ -184,6 +235,14 @@ func (rc *raftNode) writeError(err error) { } func (rc *raftNode) startRaft() { + if !fileutil.Exist(rc.snapdir) { + if err := os.Mkdir(rc.snapdir, 0750); err != nil { + log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err) + } + } + rc.snapshotter = snap.New(rc.snapdir) + rc.snapshotterReady <- rc.snapshotter + oldwal := wal.Exist(rc.waldir) rc.wal = rc.replayWAL() @@ -247,7 +306,65 @@ func (rc *raftNode) stopHTTP() { <-rc.httpdonec } +func (rc *raftNode) publishSnapshot(snapshotToSave raftpb.Snapshot) { + if raft.IsEmptySnap(snapshotToSave) { + return + } + + log.Printf("publishing snapshot at index %d", rc.snapshotIndex) + defer log.Printf("finished publishing snapshot at index %d", rc.snapshotIndex) + + if snapshotToSave.Metadata.Index <= rc.appliedIndex { + log.Fatalf("snapshot index [%d] should > progress.appliedIndex [%d] + 1", snapshotToSave.Metadata.Index, rc.appliedIndex) + } + rc.commitC <- nil // trigger kvstore to load snapshot + + rc.confState = snapshotToSave.Metadata.ConfState + rc.snapshotIndex = snapshotToSave.Metadata.Index + rc.appliedIndex = snapshotToSave.Metadata.Index +} + +var snapshotCatchUpEntriesN uint64 = 10000 + +func (rc *raftNode) maybeTriggerSnapshot() { + if rc.appliedIndex-rc.snapshotIndex <= rc.snapCount { + return + } + + log.Printf("start snapshot [applied index: %d | last snapshot index: %d]", rc.appliedIndex, rc.snapshotIndex) + data, err := rc.getSnapshot() + if err != nil { + log.Panic(err) + } + snap, err := rc.raftStorage.CreateSnapshot(rc.appliedIndex, &rc.confState, data) + if err != nil { + panic(err) + } + if err := rc.saveSnap(snap); err != nil { + panic(err) + } + + compactIndex := uint64(1) + if rc.appliedIndex > snapshotCatchUpEntriesN { + compactIndex = rc.appliedIndex - snapshotCatchUpEntriesN + } + if err := rc.raftStorage.Compact(compactIndex); err != nil { + panic(err) + } + + log.Printf("compacted log at index %d", compactIndex) + rc.snapshotIndex = rc.appliedIndex +} + func (rc *raftNode) serveChannels() { + snap, err := rc.raftStorage.Snapshot() + if err != nil { + panic(err) + } + rc.confState = snap.Metadata.ConfState + rc.snapshotIndex = snap.Metadata.Index + rc.appliedIndex = snap.Metadata.Index + defer rc.wal.Close() ticker := time.NewTicker(100 * time.Millisecond) @@ -290,12 +407,18 @@ func (rc *raftNode) serveChannels() { // store raft entries to wal, then publish over commit channel case rd := <-rc.node.Ready(): rc.wal.Save(rd.HardState, rd.Entries) + if !raft.IsEmptySnap(rd.Snapshot) { + rc.saveSnap(rd.Snapshot) + rc.raftStorage.ApplySnapshot(rd.Snapshot) + rc.publishSnapshot(rd.Snapshot) + } rc.raftStorage.Append(rd.Entries) rc.transport.Send(rd.Messages) - if ok := rc.publishEntries(rd.CommittedEntries); !ok { + if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok { rc.stop() return } + rc.maybeTriggerSnapshot() rc.node.Advance() case err := <-rc.transport.ErrorC: diff --git a/contrib/raftexample/raftexample_test.go b/contrib/raftexample/raftexample_test.go index 33a2448aa..1ee0ca5c3 100644 --- a/contrib/raftexample/raftexample_test.go +++ b/contrib/raftexample/raftexample_test.go @@ -47,9 +47,10 @@ func newCluster(n int) *cluster { for i := range clus.peers { os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) + os.RemoveAll(fmt.Sprintf("raftexample-%d-snap", i+1)) clus.proposeC[i] = make(chan string, 1) clus.confChangeC[i] = make(chan raftpb.ConfChange, 1) - clus.commitC[i], clus.errorC[i] = newRaftNode(i+1, clus.peers, false, clus.proposeC[i], clus.confChangeC[i]) + clus.commitC[i], clus.errorC[i], _ = newRaftNode(i+1, clus.peers, false, nil, clus.proposeC[i], clus.confChangeC[i]) } return clus @@ -79,6 +80,7 @@ func (clus *cluster) Close() (err error) { } // clean intermediates os.RemoveAll(fmt.Sprintf("raftexample-%d", i+1)) + os.RemoveAll(fmt.Sprintf("raftexample-%d-snap", i+1)) } return err }