From 81a73dbc8003ca854a9e1e52ae30a6cb089b1540 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Fri, 20 Dec 2013 15:39:17 -0700 Subject: [PATCH] bump(github.com/coreos/raft): ac7be58b1bec49dfcfc7216df4ae27173da1fa57 --- .../github.com/coreos/raft/.travis.yml | 4 +- third_party/github.com/coreos/raft/Makefile | 1 + third_party/github.com/coreos/raft/README.md | 52 ++++++++++- third_party/github.com/coreos/raft/event.go | 55 ++++++++++++ .../coreos/raft/event_dispatcher.go | 50 +++++++++++ .../coreos/raft/event_dispatcher_test.go | 45 ++++++++++ .../coreos/raft/http_transporter.go | 8 +- third_party/github.com/coreos/raft/log.go | 21 +++-- third_party/github.com/coreos/raft/peer.go | 4 +- third_party/github.com/coreos/raft/server.go | 87 +++++++++++++++---- .../github.com/coreos/raft/server_test.go | 40 ++++++++- .../github.com/coreos/raft/statemachine.go | 3 +- third_party/github.com/coreos/raft/test.go | 13 +++ 13 files changed, 342 insertions(+), 41 deletions(-) create mode 100644 third_party/github.com/coreos/raft/event.go create mode 100644 third_party/github.com/coreos/raft/event_dispatcher.go create mode 100644 third_party/github.com/coreos/raft/event_dispatcher_test.go diff --git a/third_party/github.com/coreos/raft/.travis.yml b/third_party/github.com/coreos/raft/.travis.yml index 5f70bdf4c..4e10f5ffe 100644 --- a/third_party/github.com/coreos/raft/.travis.yml +++ b/third_party/github.com/coreos/raft/.travis.yml @@ -1,8 +1,10 @@ language: go go: - - 1.1 + - 1.1.2 + - 1.2 install: + - go get github.com/stretchr/testify/assert - make dependencies diff --git a/third_party/github.com/coreos/raft/Makefile b/third_party/github.com/coreos/raft/Makefile index a5014509e..9847fe2de 100644 --- a/third_party/github.com/coreos/raft/Makefile +++ b/third_party/github.com/coreos/raft/Makefile @@ -8,6 +8,7 @@ dependencies: go get -d . test: + go test -i ./... go test -v ./... .PHONY: coverage dependencies test diff --git a/third_party/github.com/coreos/raft/README.md b/third_party/github.com/coreos/raft/README.md index 9d189ff6d..5486cc569 100644 --- a/third_party/github.com/coreos/raft/README.md +++ b/third_party/github.com/coreos/raft/README.md @@ -34,11 +34,27 @@ go-raft is under the MIT license. These projects are built on go-raft: -- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery -- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus. +- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery. +- [goraft/raftd](https://github.com/goraft/raftd) - A reference implementation for using the go-raft library for distributed consensus. +- [skynetservices/skydns](https://github.com/skynetservices/skydns) - DNS for skynet or any other service discovery. +- [influxdb/influxdb](https://github.com/influxdb/influxdb) - An open-source, distributed, time series, events, and metrics database. If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples. +## Contact and Resources + +- [raft-dev][raft-dev] is a mailing list for discussion about best practices + and implementation of Raft. Not goraft specific but helpful if you have + questions. +- [Slides from Ben's talk][bens-talk] which includes easy to understand + diagrams of leader election and replication +- The [Raft Consensus homepage][raft-home] has links to additional raft + implementations, slides to talks on Raft and general information + +[raft-home]: http://raftconsensus.github.io/ +[raft-dev]: https://groups.google.com/forum/#!forum/raft-dev +[bens-talk]: https://speakerdeck.com/benbjohnson/raft-the-understandable-distributed-consensus-protocol + ## The Raft Protocol This section provides a summary of the Raft protocol from a high level. @@ -83,6 +99,38 @@ By ensuring that this log is replicated identically between all the nodes in the Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers). Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log. + +## Raft in Practice + +### Optimal Cluster Size + +The primary consideration when choosing the node count in your Raft cluster is the number of nodes that can simultaneously fail. +Because Raft requires a majority of nodes to be available to make progress, the number of node failures the cluster can tolerate is `(n / 2) - 1`. + +This means that a 3-node cluster can tolerate 1 node failure. +If 2 nodes fail then the cluster cannot commit entries or elect a new leader so progress stops. +A 5-node cluster can tolerate 2 node failures. A 9-node cluster can tolerate 4 node failures. +It is unlikely that 4 nodes will simultaneously fail so clusters larger than 9 nodes are not common. + +Another consideration is performance. +The leader must replicate log entries for each follower node so CPU and networking resources can quickly be bottlenecked under stress in a large cluster. + + +### Scaling Raft + +Once you grow beyond the maximum size of your cluster there are a few options for scaling Raft: + +1. *Core nodes with dumb replication.* + This option requires you to maintain a small cluster (e.g. 5 nodes) that is involved in the Raft process and then replicate only committed log entries to the remaining nodes in the cluster. + This works well if you have reads in your system that can be stale. + +2. *Sharding.* + This option requires that you segment your data into different clusters. + This option works well if you need very strong consistency and therefore need to read and write heavily from the leader. + +If you have a very large cluster that you need to replicate to using Option 1 then you may want to look at performing hierarchical replication so that nodes can better share the load. + + ## History Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky). diff --git a/third_party/github.com/coreos/raft/event.go b/third_party/github.com/coreos/raft/event.go new file mode 100644 index 000000000..8cd20bf66 --- /dev/null +++ b/third_party/github.com/coreos/raft/event.go @@ -0,0 +1,55 @@ +package raft + +const ( + StateChangeEventType = "stateChange" + LeaderChangeEventType = "leaderChange" + TermChangeEventType = "termChange" + AddPeerEventType = "addPeer" + RemovePeerEventType = "removePeer" +) + +// Event represents an action that occurred within the Raft library. +// Listeners can subscribe to event types by using the Server.AddEventListener() function. +type Event interface { + Type() string + Source() interface{} + Value() interface{} + PrevValue() interface{} +} + +// event is the concrete implementation of the Event interface. +type event struct { + typ string + source interface{} + value interface{} + prevValue interface{} +} + +// newEvent creates a new event. +func newEvent(typ string, value interface{}, prevValue interface{}) *event { + return &event{ + typ: typ, + value: value, + prevValue: prevValue, + } +} + +// Type returns the type of event that occurred. +func (e *event) Type() string { + return e.typ +} + +// Source returns the object that dispatched the event. +func (e *event) Source() interface{} { + return e.source +} + +// Value returns the current value associated with the event, if applicable. +func (e *event) Value() interface{} { + return e.value +} + +// PrevValue returns the previous value associated with the event, if applicable. +func (e *event) PrevValue() interface{} { + return e.prevValue +} diff --git a/third_party/github.com/coreos/raft/event_dispatcher.go b/third_party/github.com/coreos/raft/event_dispatcher.go new file mode 100644 index 000000000..e985a99fc --- /dev/null +++ b/third_party/github.com/coreos/raft/event_dispatcher.go @@ -0,0 +1,50 @@ +package raft + +import ( + "sync" +) + +// eventDispatcher is responsible for managing listeners for named events +// and dispatching event notifications to those listeners. +type eventDispatcher struct { + sync.RWMutex + source interface{} + listeners map[string]eventListeners +} + +// EventListener is a function that can receive event notifications. +type EventListener func(Event) + +// EventListeners represents a collection of individual listeners. +type eventListeners []EventListener + +// newEventDispatcher creates a new eventDispatcher instance. +func newEventDispatcher(source interface{}) *eventDispatcher { + return &eventDispatcher{ + source: source, + listeners: make(map[string]eventListeners), + } +} + +// AddEventListener adds a listener function for a given event type. +func (d *eventDispatcher) AddEventListener(typ string, listener EventListener) { + d.Lock() + defer d.Unlock() + d.listeners[typ] = append(d.listeners[typ], listener) +} + +// DispatchEvent dispatches an event. +func (d *eventDispatcher) DispatchEvent(e Event) { + d.RLock() + defer d.RUnlock() + + // Automatically set the event source. + if e, ok := e.(*event); ok { + e.source = d.source + } + + // Dispatch the event to all listeners. + for _, l := range d.listeners[e.Type()] { + l(e) + } +} diff --git a/third_party/github.com/coreos/raft/event_dispatcher_test.go b/third_party/github.com/coreos/raft/event_dispatcher_test.go new file mode 100644 index 000000000..affdaaab9 --- /dev/null +++ b/third_party/github.com/coreos/raft/event_dispatcher_test.go @@ -0,0 +1,45 @@ +package raft + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +// Ensure that we can listen and dispatch events. +func TestDispatchEvent(t *testing.T) { + var count int + dispatcher := newEventDispatcher(nil) + dispatcher.AddEventListener("foo", func(e Event) { + count += 1 + }) + dispatcher.AddEventListener("foo", func(e Event) { + count += 10 + }) + dispatcher.AddEventListener("bar", func(e Event) { + count += 100 + }) + dispatcher.DispatchEvent(&event{typ: "foo", value: nil, prevValue: nil}) + assert.Equal(t, 11, count) +} + +// Ensure that event is properly passed to listener. +func TestEventListener(t *testing.T) { + dispatcher := newEventDispatcher("X") + dispatcher.AddEventListener("foo", func(e Event) { + assert.Equal(t, "foo", e.Type()) + assert.Equal(t, "X", e.Source()) + assert.Equal(t, 10, e.Value()) + assert.Equal(t, 20, e.PrevValue()) + }) + dispatcher.DispatchEvent(&event{typ: "foo", value: 10, prevValue: 20}) +} + +// Benchmark the performance of event dispatch. +func BenchmarkEventDispatch(b *testing.B) { + dispatcher := newEventDispatcher(nil) + dispatcher.AddEventListener("xxx", func(e Event) {}) + for i := 0; i < b.N; i++ { + dispatcher.DispatchEvent(&event{typ: "foo", value: 10, prevValue: 20}) + } +} diff --git a/third_party/github.com/coreos/raft/http_transporter.go b/third_party/github.com/coreos/raft/http_transporter.go index 80aa3e74e..de2fa67ac 100644 --- a/third_party/github.com/coreos/raft/http_transporter.go +++ b/third_party/github.com/coreos/raft/http_transporter.go @@ -23,6 +23,7 @@ type HTTPTransporter struct { prefix string appendEntriesPath string requestVotePath string + httpClient http.Client } type HTTPMuxer interface { @@ -42,6 +43,7 @@ func NewHTTPTransporter(prefix string) *HTTPTransporter { prefix: prefix, appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"), requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"), + httpClient: http.Client{Transport: &http.Transport{DisableKeepAlives: false}}, } } @@ -97,8 +99,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, re url := fmt.Sprintf("%s%s", peer.ConnectionString, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) - client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} - httpResp, err := client.Post(url, "application/protobuf", &b) + httpResp, err := t.httpClient.Post(url, "application/protobuf", &b) if httpResp == nil || err != nil { traceln("transporter.ae.response.error:", err) return nil @@ -125,8 +126,7 @@ func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *Reques url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath()) traceln(server.Name(), "POST", url) - client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} - httpResp, err := client.Post(url, "application/protobuf", &b) + httpResp, err := t.httpClient.Post(url, "application/protobuf", &b) if httpResp == nil || err != nil { traceln("transporter.rv.response.error:", err) return nil diff --git a/third_party/github.com/coreos/raft/log.go b/third_party/github.com/coreos/raft/log.go index 757e4301f..b6eb133a5 100644 --- a/third_party/github.com/coreos/raft/log.go +++ b/third_party/github.com/coreos/raft/log.go @@ -333,7 +333,7 @@ func (l *Log) commitInfo() (index uint64, term uint64) { return entry.Index, entry.Term } -// Retrieves the last index and term that has been committed to the log. +// Retrieves the last index and term that has been appended to the log. func (l *Log) lastInfo() (index uint64, term uint64) { l.mutex.RLock() defer l.mutex.RUnlock() @@ -366,8 +366,7 @@ func (l *Log) setCommitIndex(index uint64) error { // this is not error any more after limited the number of sending entries // commit up to what we already have if index > l.startIndex+uint64(len(l.entries)) { - debugln("raft.StartIndex", l.startIndex) - debugln("raft.Log: Commit index", index, "set back to ", l.startIndex+uint64(len(l.entries))) + debugln("raft.Log: Commit index", index, "set back to ", len(l.entries)) index = l.startIndex + uint64(len(l.entries)) } @@ -387,7 +386,6 @@ func (l *Log) setCommitIndex(index uint64) error { // follower 2 should reply success and let leader 3 update the committed index to 80 if index < l.commitIndex { - debugln("raft.Log: index", index, "committedIndex", l.commitIndex) return nil } @@ -475,8 +473,7 @@ func (l *Log) truncate(index uint64, term uint64) error { // Append //-------------------------------------- -// Appends a series of entries to the log. These entries are not written to -// disk until setCommitIndex() is called. +// Appends a series of entries to the log. func (l *Log) appendEntries(entries []*LogEntry) error { l.mutex.Lock() defer l.mutex.Unlock() @@ -497,14 +494,20 @@ func (l *Log) appendEntries(entries []*LogEntry) error { startPosition += size } w.Flush() + err = l.file.Sync() + + if err != nil { + panic(err) + } return nil } -// Writes a single log entry to the end of the log. This function does not -// obtain a lock and should only be used internally. Use AppendEntries() and -// AppendEntry() to use it externally. +// Writes a single log entry to the end of the log. func (l *Log) appendEntry(entry *LogEntry) error { + l.mutex.Lock() + defer l.mutex.Unlock() + if l.file == nil { return errors.New("raft.Log: Log is not open") } diff --git a/third_party/github.com/coreos/raft/peer.go b/third_party/github.com/coreos/raft/peer.go index 516b535e4..05a398178 100644 --- a/third_party/github.com/coreos/raft/peer.go +++ b/third_party/github.com/coreos/raft/peer.go @@ -126,6 +126,8 @@ func (p *Peer) heartbeat(c chan bool) { c <- true + ticker := time.Tick(p.heartbeatTimeout) + debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout) for { @@ -142,7 +144,7 @@ func (p *Peer) heartbeat(c chan bool) { return } - case <-time.After(p.heartbeatTimeout): + case <-ticker: p.flush() } } diff --git a/third_party/github.com/coreos/raft/server.go b/third_party/github.com/coreos/raft/server.go index d7dec5b23..8b0f6496f 100644 --- a/third_party/github.com/coreos/raft/server.go +++ b/third_party/github.com/coreos/raft/server.go @@ -94,9 +94,12 @@ type Server interface { Do(command Command) (interface{}, error) TakeSnapshot() error LoadSnapshot() error + AddEventListener(string, EventListener) } type server struct { + *eventDispatcher + name string path string state string @@ -111,7 +114,7 @@ type server struct { mutex sync.RWMutex syncedPeer map[string]bool - c chan *event + c chan *ev electionTimeout time.Duration heartbeatTimeout time.Duration @@ -123,8 +126,8 @@ type server struct { connectionString string } -// An event to be processed by the server's event loop. -type event struct { +// An internal event to be processed by the server's event loop. +type ev struct { target interface{} returnValue interface{} c chan error @@ -136,7 +139,11 @@ type event struct { // //------------------------------------------------------------------------------ -// Creates a new server with a log at the given path. +// Creates a new server with a log at the given path. transporter must +// not be nil. stateMachine can be nil if snapshotting and log +// compaction is to be disabled. context can be anything (including nil) +// and is not used by the raft package except returned by +// Server.Context(). connectionString can be anything. func NewServer(name string, path string, transporter Transporter, stateMachine StateMachine, context interface{}, connectionString string) (Server, error) { if name == "" { return nil, errors.New("raft.Server: Name cannot be blank") @@ -154,12 +161,13 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S state: Stopped, peers: make(map[string]*Peer), log: newLog(), - c: make(chan *event, 256), + c: make(chan *ev, 256), electionTimeout: DefaultElectionTimeout, heartbeatTimeout: DefaultHeartbeatTimeout, maxLogEntriesPerRequest: MaxLogEntriesPerRequest, connectionString: connectionString, } + s.eventDispatcher = newEventDispatcher(s) // Setup apply function. s.log.ApplyFunc = func(c Command) (interface{}, error) { @@ -246,19 +254,37 @@ func (s *server) State() string { func (s *server) setState(state string) { s.mutex.Lock() defer s.mutex.Unlock() + + // Temporarily store previous values. + prevState := s.state + prevLeader := s.leader + + // Update state and leader. s.state = state if state == Leader { s.leader = s.Name() } + + // Dispatch state and leader change events. + if prevState != state { + s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState)) + } + if prevLeader != s.leader { + s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader)) + } } // Retrieves the current term of the server. func (s *server) Term() uint64 { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.currentTerm } // Retrieves the current commit index of the server. func (s *server) CommitIndex() uint64 { + s.log.mutex.RLock() + defer s.log.mutex.RUnlock() return s.log.commitIndex } @@ -375,7 +401,7 @@ func init() { func (s *server) Start() error { // Exit if the server is already running. - if s.state != Stopped { + if s.State() != Stopped { return errors.New("raft.Server: Server already running") } @@ -443,22 +469,34 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) { s.mutex.Lock() defer s.mutex.Unlock() - // update the term and clear vote for + // Store previous values temporarily. + prevState := s.state + prevTerm := s.currentTerm + prevLeader := s.leader + if term > s.currentTerm { + // update the term and clear vote for s.state = Follower s.currentTerm = term s.leader = leaderName s.votedFor = "" - return - } - - // discover new leader when candidate - // save leader name when follower - if term == s.currentTerm && s.state != Leader && append { + } else if term == s.currentTerm && s.state != Leader && append { + // discover new leader when candidate + // save leader name when follower s.state = Follower s.leader = leaderName } + // Dispatch change events. + if prevState != s.state { + s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState)) + } + if prevLeader != s.leader { + s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader)) + } + if prevTerm != s.currentTerm { + s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm)) + } } //-------------------------------------- @@ -512,8 +550,8 @@ func (s *server) send(value interface{}) (interface{}, error) { return event.returnValue, err } -func (s *server) sendAsync(value interface{}) *event { - event := &event{target: value, c: make(chan error, 1)} +func (s *server) sendAsync(value interface{}) *ev { + event := &ev{target: value, c: make(chan error, 1)} s.c <- event return event } @@ -588,7 +626,13 @@ func (s *server) followerLoop() { // The event loop that is run when the server is in a Candidate state. func (s *server) candidateLoop() { lastLogIndex, lastLogTerm := s.log.lastInfo() + + // Clear leader value. + prevLeader := s.leader s.leader = "" + if prevLeader != s.leader { + s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader)) + } for { // Increment current term, vote for self. @@ -765,7 +809,7 @@ func (s *server) Do(command Command) (interface{}, error) { } // Processes a command. -func (s *server) processCommand(command Command, e *event) { +func (s *server) processCommand(command Command, e *ev) { s.debugln("server.command.process") // Create an entry for the command in the log. @@ -866,7 +910,7 @@ func (s *server) processAppendEntriesRequest(req *AppendEntriesRequest) (*Append func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { // If we find a higher term then change to a follower and exit. - if resp.Term > s.currentTerm { + if resp.Term > s.Term() { s.setCurrentTerm(resp.Term, "", false) return } @@ -914,6 +958,7 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) { default: panic("server unable to send signal to commit channel") } + entry.commit = nil } } } @@ -937,7 +982,7 @@ func (s *server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse { func (s *server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) { // If the request is coming from an old term then reject it. - if req.Term < s.currentTerm { + if req.Term < s.Term() { s.debugln("server.rv.error: stale term") return newRequestVoteResponse(s.currentTerm, false), false } @@ -989,6 +1034,8 @@ func (s *server) AddPeer(name string, connectiongString string) error { } s.peers[peer.Name] = peer + + s.DispatchEvent(newEvent(AddPeerEventType, name, nil)) } // Write the configuration to file. @@ -1015,6 +1062,8 @@ func (s *server) RemovePeer(name string) error { } delete(s.peers, name) + + s.DispatchEvent(newEvent(RemovePeerEventType, name, nil)) } // Write the configuration to file. @@ -1317,7 +1366,7 @@ func (s *server) readConf() error { //-------------------------------------- func (s *server) debugln(v ...interface{}) { - debugf("[%s Term:%d] %s", s.name, s.currentTerm, fmt.Sprintln(v...)) + debugf("[%s Term:%d] %s", s.name, s.Term(), fmt.Sprintln(v...)) } func (s *server) traceln(v ...interface{}) { diff --git a/third_party/github.com/coreos/raft/server_test.go b/third_party/github.com/coreos/raft/server_test.go index 792ed90f3..f8be91e5f 100644 --- a/third_party/github.com/coreos/raft/server_test.go +++ b/third_party/github.com/coreos/raft/server_test.go @@ -1,6 +1,7 @@ package raft import ( + "encoding/json" "fmt" "reflect" "strconv" @@ -44,7 +45,10 @@ func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) { t.Fatalf("Server %s unable to join: %v", s.Name(), err) } + s.(*server).mutex.Lock() s.(*server).currentTerm = 2 + s.(*server).mutex.Unlock() + defer s.Stop() resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0)) if resp.Term != 2 || resp.VoteGranted { @@ -64,7 +68,9 @@ func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) { t.Fatalf("Server %s unable to join: %v", s.Name(), err) } + s.(*server).mutex.Lock() s.(*server).currentTerm = 2 + s.(*server).mutex.Unlock() defer s.Stop() resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0)) if resp.Term != 2 || !resp.VoteGranted { @@ -87,7 +93,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) { time.Sleep(time.Millisecond * 100) + s.(*server).mutex.Lock() s.(*server).currentTerm = 2 + s.(*server).mutex.Unlock() defer s.Stop() resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1)) if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" { @@ -235,7 +243,9 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) { s.Start() defer s.Stop() + s.(*server).mutex.Lock() s.(*server).currentTerm = 2 + s.(*server).mutex.Unlock() // Append single entry. e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10}) @@ -328,13 +338,23 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { mutex.RLock() target := servers[peer.Name] mutex.RUnlock() - return target.RequestVote(req) + + b, _ := json.Marshal(req) + clonedReq := &RequestVoteRequest{} + json.Unmarshal(b, clonedReq) + + return target.RequestVote(clonedReq) } transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() target := servers[peer.Name] mutex.RUnlock() - return target.AppendEntries(req) + + b, _ := json.Marshal(req) + clonedReq := &AppendEntriesRequest{} + json.Unmarshal(b, clonedReq) + + return target.AppendEntries(clonedReq) } disTransporter := &testTransporter{} @@ -359,7 +379,9 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { for _, name := range names { s := newTestServer(name, transporter) + mutex.Lock() servers[name] = s + mutex.Unlock() paths[name] = s.Path() if name == "1" { @@ -474,13 +496,23 @@ func TestServerMultiNode(t *testing.T) { mutex.RLock() target := servers[peer.Name] mutex.RUnlock() - return target.RequestVote(req) + + b, _ := json.Marshal(req) + clonedReq := &RequestVoteRequest{} + json.Unmarshal(b, clonedReq) + + return target.RequestVote(clonedReq) } transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() target := servers[peer.Name] mutex.RUnlock() - return target.AppendEntries(req) + + b, _ := json.Marshal(req) + clonedReq := &AppendEntriesRequest{} + json.Unmarshal(b, clonedReq) + + return target.AppendEntries(clonedReq) } disTransporter := &testTransporter{} diff --git a/third_party/github.com/coreos/raft/statemachine.go b/third_party/github.com/coreos/raft/statemachine.go index e59036cef..a0a22e8a3 100644 --- a/third_party/github.com/coreos/raft/statemachine.go +++ b/third_party/github.com/coreos/raft/statemachine.go @@ -7,7 +7,8 @@ package raft //------------------------------------------------------------------------------ // StateMachine is the interface for allowing the host application to save and -// recovery the state machine +// recovery the state machine. This makes it possible to make snapshots +// and compact the log. type StateMachine interface { Save() ([]byte, error) Recovery([]byte) error diff --git a/third_party/github.com/coreos/raft/test.go b/third_party/github.com/coreos/raft/test.go index 5b323f749..dfe0a39f2 100644 --- a/third_party/github.com/coreos/raft/test.go +++ b/third_party/github.com/coreos/raft/test.go @@ -12,6 +12,10 @@ const ( testElectionTimeout = 200 * time.Millisecond ) +const ( + testListenerLoggerEnabled = false +) + func init() { RegisterCommand(&testCommand1{}) RegisterCommand(&testCommand2{}) @@ -66,6 +70,15 @@ func newTestServer(name string, transporter Transporter) Server { panic(err.Error()) } server, _ := NewServer(name, p, transporter, nil, nil, "") + if testListenerLoggerEnabled { + fn := func(e Event) { + server := e.Source().(Server) + warnf("[%s] %s %v -> %v\n", server.Name(), e.Type(), e.PrevValue(), e.Value()) + } + server.AddEventListener(StateChangeEventType, fn) + server.AddEventListener(LeaderChangeEventType, fn) + server.AddEventListener(TermChangeEventType, fn) + } return server }