From 449cad465826045bbe7dffb818e00e8def912a74 Mon Sep 17 00:00:00 2001 From: Brandon Philips Date: Sun, 18 Aug 2013 19:43:13 -0700 Subject: [PATCH] bump(github.com/coreos/go-raft): bb7f7ec92e4cb6d98241cea83f55d0e85e624189 --- .../github.com/coreos/go-raft/README.md | 56 +++- .../github.com/coreos/go-raft/config.go | 7 + .../coreos/go-raft/http_transporter.go | 4 +- .../github.com/coreos/go-raft/join_command.go | 5 +- third_party/github.com/coreos/go-raft/log.go | 9 + third_party/github.com/coreos/go-raft/peer.go | 100 +++--- .../protobuf/snapshot_recovery_request.pb.go | 40 ++- .../protobuf/snapshot_recovery_request.proto | 10 +- .../github.com/coreos/go-raft/server.go | 291 +++++++++--------- .../github.com/coreos/go-raft/server_test.go | 126 +++++++- .../github.com/coreos/go-raft/snapshot.go | 6 +- .../go-raft/snapshot_recovery_request.go | 26 +- third_party/github.com/coreos/go-raft/test.go | 7 +- 13 files changed, 461 insertions(+), 226 deletions(-) create mode 100644 third_party/github.com/coreos/go-raft/config.go diff --git a/third_party/github.com/coreos/go-raft/README.md b/third_party/github.com/coreos/go-raft/README.md index f948a5e9c..6ecc74866 100644 --- a/third_party/github.com/coreos/go-raft/README.md +++ b/third_party/github.com/coreos/go-raft/README.md @@ -1,19 +1,49 @@ -[![Build Status](https://travis-ci.org/benbjohnson/go-raft.png?branch=master)](https://travis-ci.org/benbjohnson/go-raft) - go-raft ======= +[![Build Status](https://travis-ci.org/goraft/raft.png?branch=master)](https://travis-ci.org/goraft/raft) + ## Overview -This is an Go implementation of the Raft distributed consensus protocol. +This is a Go implementation of the Raft distributed consensus protocol. Raft is a protocol by which a cluster of nodes can maintain a replicated state machine. The state machine is kept in sync through the use of a replicated log. -For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout. +For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm][raft-paper] by Diego Ongaro and John Ousterhout. +## Project Status + +This library is feature complete but should be considered experimental until it has seen more usage. +If you have any questions on implementing go-raft in your project please file an issue. +There is an [active community][community] of developers who can help. +go-raft is under the MIT license. + +[community]: https://github.com/goraft/raft/contributors + +### Features + +- Leader election +- Log replication +- Configuration changes +- Log compaction +- Unit tests +- Fast Protobuf Log Encoding +- HTTP transport + +### Projects + +These projects are built on go-raft: + +- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery +- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus. + +If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples. ## The Raft Protocol +This section provides a summary of the Raft protocol from a high level. +For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm][raft-paper]. + ### Overview Maintaining state in a single process on a single server is easy. @@ -26,7 +56,7 @@ Servers can crash or the network between two machines can become unavailable or A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster. Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation. -An alternative is the [Raft distributed consensus protocol](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout. +An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout. Raft is a protocol built with understandability as a primary tenant and it centers around two things: 1. Leader Election @@ -53,17 +83,9 @@ By ensuring that this log is replicated identically between all the nodes in the Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers). Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log. -For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) +## History +Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky). +He put it under the MIT license in the hopes that it would be useful for other projects too. -## Project Status - -The go-raft library is feature complete but in alpha. -There is a reference implementation called [raftd](https://github.com/benbjohnson/raftd) that demonstrates how to use the library - -The library will be considered experimental until it has significant production usage. -I'm writing the library for the purpose of including distributed processing in my behavioral analytics database called [Sky](https://github.com/skydb/sky). -However, I hope other projects can benefit from having a distributed consensus protocol so the go-raft library is available under MIT license. - -If you have a project that you're using go-raft in, please add it to this README and send a pull request so others can see implementation examples. -If you have any questions on implementing go-raft in your project, feel free to contact me on [GitHub](https://github.com/benbjohnson), [Twitter](https://twitter.com/benbjohnson) or by e-mail at [ben@skylandlabs.com](mailto:ben@skylandlabs.com). +[raft-paper]: https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf diff --git a/third_party/github.com/coreos/go-raft/config.go b/third_party/github.com/coreos/go-raft/config.go new file mode 100644 index 000000000..d202dea0d --- /dev/null +++ b/third_party/github.com/coreos/go-raft/config.go @@ -0,0 +1,7 @@ +package raft + +type Config struct { + CommitIndex uint64 `json:"commitIndex"` + // TODO decide what we need to store in peer struct + Peers []*Peer `json:"peers"` +} diff --git a/third_party/github.com/coreos/go-raft/http_transporter.go b/third_party/github.com/coreos/go-raft/http_transporter.go index 1125f91f5..7dbcf5a40 100644 --- a/third_party/github.com/coreos/go-raft/http_transporter.go +++ b/third_party/github.com/coreos/go-raft/http_transporter.go @@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r return nil } - url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath()) + url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath()) traceln(server.Name(), "POST", url) client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} @@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque return nil } - url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath()) + url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath()) traceln(server.Name(), "POST", url) client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}} diff --git a/third_party/github.com/coreos/go-raft/join_command.go b/third_party/github.com/coreos/go-raft/join_command.go index 74e14239d..949eaae76 100644 --- a/third_party/github.com/coreos/go-raft/join_command.go +++ b/third_party/github.com/coreos/go-raft/join_command.go @@ -9,7 +9,8 @@ type JoinCommand interface { // Join command type DefaultJoinCommand struct { - Name string `json:"name"` + Name string `json:"name"` + ConnectionString string `json:"connectionString"` } // The name of the Join command in the log @@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string { } func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) { - err := server.AddPeer(c.Name) + err := server.AddPeer(c.Name, c.ConnectionString) return []byte("join"), err } diff --git a/third_party/github.com/coreos/go-raft/log.go b/third_party/github.com/coreos/go-raft/log.go index 42553f24e..b686d317c 100644 --- a/third_party/github.com/coreos/go-raft/log.go +++ b/third_party/github.com/coreos/go-raft/log.go @@ -183,6 +183,15 @@ func (l *Log) open(path string) error { // Append entry. l.entries = append(l.entries, entry) + + if entry.Index <= l.commitIndex { + command, err := newCommand(entry.CommandName, entry.Command) + if err != nil { + continue + } + l.ApplyFunc(command) + } + debugln("open.log.append log index ", entry.Index) readBytes += int64(n) diff --git a/third_party/github.com/coreos/go-raft/peer.go b/third_party/github.com/coreos/go-raft/peer.go index e7761dd97..37b8c3fb7 100644 --- a/third_party/github.com/coreos/go-raft/peer.go +++ b/third_party/github.com/coreos/go-raft/peer.go @@ -14,7 +14,8 @@ import ( // A peer is a reference to another server involved in the consensus protocol. type Peer struct { server *Server - name string + Name string `json:"name"` + ConnectionString string `json:"connectionString"` prevLogIndex uint64 mutex sync.RWMutex stopChan chan bool @@ -28,10 +29,11 @@ type Peer struct { //------------------------------------------------------------------------------ // Creates a new peer. -func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer { +func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer { return &Peer{ server: server, - name: name, + Name: name, + ConnectionString: connectionString, heartbeatTimeout: heartbeatTimeout, } } @@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer // //------------------------------------------------------------------------------ -// Retrieves the name of the peer. -func (p *Peer) Name() string { - return p.name -} - // Sets the heartbeat timeout. func (p *Peer) setHeartbeatTimeout(duration time.Duration) { p.heartbeatTimeout = duration @@ -89,17 +86,17 @@ func (p *Peer) startHeartbeat() { } // Stops the peer heartbeat. -func (p *Peer) stopHeartbeat() { +func (p *Peer) stopHeartbeat(flush bool) { // here is a problem // the previous stop is no buffer leader may get blocked - // when heartbeat returns at line 132 + // when heartbeat returns // I make the channel with 1 buffer // and try to panic here select { - case p.stopChan <- true: + case p.stopChan <- flush: default: - panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat") + panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat") } } @@ -113,8 +110,9 @@ func (p *Peer) clone() *Peer { p.mutex.Lock() defer p.mutex.Unlock() return &Peer{ - name: p.name, - prevLogIndex: p.prevLogIndex, + Name: p.Name, + ConnectionString: p.ConnectionString, + prevLogIndex: p.prevLogIndex, } } @@ -128,46 +126,58 @@ func (p *Peer) heartbeat(c chan bool) { c <- true - debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout) + debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout) for { select { - case <-stopChan: - debugln("peer.heartbeat.stop: ", p.Name()) - return - - case <-time.After(p.heartbeatTimeout): - debugln("peer.heartbeat.run: ", p.Name()) - prevLogIndex := p.getPrevLogIndex() - entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) - - if p.server.State() != Leader { + case flush := <-stopChan: + if !flush { + debugln("peer.heartbeat.stop: ", p.Name) + return + } else { + // before we can safely remove a node + // we must flush the remove command to the node first + p.flush() + debugln("peer.heartbeat.stop: ", p.Name) return } - if entries != nil { - p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) - } else { - p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) - } + case <-time.After(p.heartbeatTimeout): + p.flush() } } } +func (p *Peer) flush() { + debugln("peer.heartbeat.run: ", p.Name) + prevLogIndex := p.getPrevLogIndex() + entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest) + + if p.server.State() != Leader { + return + } + + if entries != nil { + p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries)) + } else { + p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot)) + } +} + //-------------------------------------- // Append Entries //-------------------------------------- // Sends an AppendEntries request to the peer through the transport. func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { - traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries)) + traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries)) resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req) if resp == nil { - debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name()) + debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name) return } - traceln("peer.flush.recv: ", p.Name()) + traceln("peer.flush.recv: ", p.Name) // If successful then update the previous log index. p.mutex.Lock() @@ -181,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { resp.append = true } } - traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex) + traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex) // If it was unsuccessful then decrement the previous log index and // we'll try again next time. @@ -195,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { p.prevLogIndex = resp.CommitIndex - debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex) + debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex) } else if p.prevLogIndex > 0 { // Decrement the previous log index down until we find a match. Don't // let it go below where the peer's commit index is though. That's a @@ -206,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) { p.prevLogIndex = resp.Index } - debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex) + debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex) } } p.mutex.Unlock() // Attach the peer to resp, thus server can know where it comes from - resp.peer = p.Name() + resp.peer = p.Name // Send response to server for processing. p.server.send(resp) } // Sends an Snapshot request to the peer through the transport. func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) { - debugln("peer.snap.send: ", p.name) + debugln("peer.snap.send: ", p.Name) resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req) if resp == nil { - debugln("peer.snap.timeout: ", p.name) + debugln("peer.snap.timeout: ", p.Name) return } - debugln("peer.snap.recv: ", p.name) + debugln("peer.snap.recv: ", p.Name) // If successful, the peer should have been to snapshot state // Send it the snapshot! if resp.Success { p.sendSnapshotRecoveryRequest() } else { - debugln("peer.snap.failed: ", p.name) + debugln("peer.snap.failed: ", p.Name) return } @@ -243,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) { // Sends an Snapshot Recovery request to the peer through the transport. func (p *Peer) sendSnapshotRecoveryRequest() { req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot) - debugln("peer.snap.recovery.send: ", p.name) + debugln("peer.snap.recovery.send: ", p.Name) resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req) if resp.Success { p.prevLogIndex = req.LastIndex } else { - debugln("peer.snap.recovery.failed: ", p.name) + debugln("peer.snap.recovery.failed: ", p.Name) return } // Send response to server for processing. @@ -261,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() { // send VoteRequest Request func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) { - debugln("peer.vote: ", p.server.Name(), "->", p.Name()) + debugln("peer.vote: ", p.server.Name(), "->", p.Name) req.peer = p if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil { - debugln("peer.vote: recv", p.server.Name(), "<-", p.Name()) + debugln("peer.vote: recv", p.server.Name(), "<-", p.Name) resp.peer = p c <- resp } diff --git a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go index f580de6ab..22a281236 100644 --- a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go +++ b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.pb.go @@ -14,12 +14,12 @@ var _ = &json.SyntaxError{} var _ = math.Inf type ProtoSnapshotRecoveryRequest struct { - LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"` - LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"` - LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"` - Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"` - State []byte `protobuf:"bytes,5,req" json:"State,omitempty"` - XXX_unrecognized []byte `json:"-"` + LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"` + LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"` + LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"` + Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"` + State []byte `protobuf:"bytes,5,req" json:"State,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *ProtoSnapshotRecoveryRequest) Reset() { *m = ProtoSnapshotRecoveryRequest{} } @@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 { return 0 } -func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string { +func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer { if m != nil { return m.Peers } @@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte { return nil } +type ProtoSnapshotRecoveryRequest_ProtoPeer struct { + Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"` + ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() { + *m = ProtoSnapshotRecoveryRequest_ProtoPeer{} +} +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) } +func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {} + +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string { + if m != nil && m.Name != nil { + return *m.Name + } + return "" +} + +func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string { + if m != nil && m.ConnectionString != nil { + return *m.ConnectionString + } + return "" +} + func init() { } diff --git a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto index 000c54d48..e84cca30c 100644 --- a/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto +++ b/third_party/github.com/coreos/go-raft/protobuf/snapshot_recovery_request.proto @@ -3,7 +3,13 @@ package protobuf; message ProtoSnapshotRecoveryRequest { required string LeaderName=1; required uint64 LastIndex=2; - required uint64 LastTerm=3; - repeated string Peers=4; + required uint64 LastTerm=3; + + message ProtoPeer { + required string Name=1; + required string ConnectionString=2; + } + repeated ProtoPeer Peers=4; + required bytes State=5; } \ No newline at end of file diff --git a/third_party/github.com/coreos/go-raft/server.go b/third_party/github.com/coreos/go-raft/server.go index fbf5c94b2..c3d3fbd46 100644 --- a/third_party/github.com/coreos/go-raft/server.go +++ b/third_party/github.com/coreos/go-raft/server.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "hash/crc32" - "io" "io/ioutil" "os" "path" @@ -81,8 +80,6 @@ type Server struct { lastSnapshot *Snapshot stateMachine StateMachine maxLogEntriesPerRequest uint64 - - confFile *os.File } // An event to be processed by the server's event loop. @@ -272,11 +269,15 @@ func (s *Server) QuorumSize() int { // Retrieves the election timeout. func (s *Server) ElectionTimeout() time.Duration { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.electionTimeout } // Sets the election timeout. func (s *Server) SetElectionTimeout(duration time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() s.electionTimeout = duration } @@ -286,6 +287,8 @@ func (s *Server) SetElectionTimeout(duration time.Duration) { // Retrieves the heartbeat timeout. func (s *Server) HeartbeatTimeout() time.Duration { + s.mutex.RLock() + defer s.mutex.RUnlock() return s.heartbeatTimeout } @@ -332,14 +335,14 @@ func (s *Server) Start() error { // Create snapshot directory if not exist os.Mkdir(path.Join(s.path, "snapshot"), 0700) - // Initialize the log and load it up. - if err := s.log.open(s.LogPath()); err != nil { - s.debugln("raft: Log error: ", err) + if err := s.readConf(); err != nil { + s.debugln("raft: Conf file error: ", err) return fmt.Errorf("raft: Initialization error: %s", err) } - if err := s.readConf(); err != nil { - s.debugln("raft: Conf file error: ", err) + // Initialize the log and load it up. + if err := s.log.open(s.LogPath()); err != nil { + s.debugln("raft: Log error: ", err) return fmt.Errorf("raft: Initialization error: %s", err) } @@ -368,59 +371,12 @@ func (s *Server) Start() error { return nil } -// Read the configuration for the server. -func (s *Server) readConf() error { - var err error - confPath := path.Join(s.path, "conf") - s.debugln("readConf.open ", confPath) - // open conf file - s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600) - - if err != nil { - if os.IsNotExist(err) { - s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600) - debugln("readConf.create ", confPath) - if err != nil { - return err - } - } - return err - } - - peerNames := make([]string, 0) - - for { - var peerName string - _, err = fmt.Fscanf(s.confFile, "%s\n", &peerName) - - if err != nil { - if err == io.EOF { - s.debugln("server.peer.conf: finish") - break - } - return err - } - s.debugln("server.peer.conf.read: ", peerName) - - peerNames = append(peerNames, peerName) - } - - s.confFile.Truncate(0) - s.confFile.Seek(0, os.SEEK_SET) - - for _, peerName := range peerNames { - s.AddPeer(peerName) - } - - return nil -} - // Shuts down the server. func (s *Server) Stop() { s.send(&stopValue) s.mutex.Lock() + defer s.mutex.Unlock() s.log.close() - s.mutex.Unlock() } // Checks if the server is currently running. @@ -532,24 +488,27 @@ func (s *Server) followerLoop() { case e := <-s.c: if e.target == &stopValue { s.setState(Stopped) - } else if command, ok := e.target.(JoinCommand); ok { - //If no log entries exist and a self-join command is issued - //then immediately become leader and commit entry. - if s.log.currentIndex() == 0 && command.NodeName() == s.Name() { - s.debugln("selfjoin and promote to leader") - s.setState(Leader) - s.processCommand(command, e) - } else { + } else { + switch req := e.target.(type) { + case JoinCommand: + //If no log entries exist and a self-join command is issued + //then immediately become leader and commit entry. + if s.log.currentIndex() == 0 && req.NodeName() == s.Name() { + s.debugln("selfjoin and promote to leader") + s.setState(Leader) + s.processCommand(req, e) + } else { + err = NotLeaderError + } + case *AppendEntriesRequest: + e.returnValue, update = s.processAppendEntriesRequest(req) + case *RequestVoteRequest: + e.returnValue, update = s.processRequestVoteRequest(req) + case *SnapshotRequest: + e.returnValue = s.processSnapshotRequest(req) + default: err = NotLeaderError } - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, update = s.processAppendEntriesRequest(req) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, update = s.processRequestVoteRequest(req) - } else if req, ok := e.target.(*SnapshotRequest); ok { - e.returnValue = s.processSnapshotRequest(req) - } else { - err = NotLeaderError } // Callback to event. @@ -629,14 +588,16 @@ func (s *Server) candidateLoop() { var err error if e.target == &stopValue { s.setState(Stopped) - } else if _, ok := e.target.(Command); ok { - err = NotLeaderError - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, _ = s.processAppendEntriesRequest(req) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, _ = s.processRequestVoteRequest(req) + } else { + switch req := e.target.(type) { + case Command: + err = NotLeaderError + case *AppendEntriesRequest: + e.returnValue, _ = s.processAppendEntriesRequest(req) + case *RequestVoteRequest: + e.returnValue, _ = s.processRequestVoteRequest(req) + } } - // Callback to event. e.c <- err @@ -660,7 +621,7 @@ func (s *Server) candidateLoop() { } } -// The event loop that is run when the server is in a Candidate state. +// The event loop that is run when the server is in a Leader state. func (s *Server) leaderLoop() { s.setState(Leader) s.syncedPeer = make(map[string]bool) @@ -682,15 +643,18 @@ func (s *Server) leaderLoop() { case e := <-s.c: if e.target == &stopValue { s.setState(Stopped) - } else if command, ok := e.target.(Command); ok { - s.processCommand(command, e) - continue - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, _ = s.processAppendEntriesRequest(req) - } else if resp, ok := e.target.(*AppendEntriesResponse); ok { - s.processAppendEntriesResponse(resp) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, _ = s.processRequestVoteRequest(req) + } else { + switch req := e.target.(type) { + case Command: + s.processCommand(req, e) + continue + case *AppendEntriesRequest: + e.returnValue, _ = s.processAppendEntriesRequest(req) + case *AppendEntriesResponse: + s.processAppendEntriesResponse(req) + case *RequestVoteRequest: + e.returnValue, _ = s.processRequestVoteRequest(req) + } } // Callback to event. @@ -705,7 +669,7 @@ func (s *Server) leaderLoop() { // Stop all peers. for _, peer := range s.peers { - peer.stopHeartbeat() + peer.stopHeartbeat(false) } s.syncedPeer = nil } @@ -720,16 +684,18 @@ func (s *Server) snapshotLoop() { if e.target == &stopValue { s.setState(Stopped) - } else if _, ok := e.target.(Command); ok { - err = NotLeaderError - } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue, _ = s.processAppendEntriesRequest(req) - } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue, _ = s.processRequestVoteRequest(req) - } else if req, ok := e.target.(*SnapshotRecoveryRequest); ok { - e.returnValue = s.processSnapshotRecoveryRequest(req) + } else { + switch req := e.target.(type) { + case Command: + err = NotLeaderError + case *AppendEntriesRequest: + e.returnValue, _ = s.processAppendEntriesRequest(req) + case *RequestVoteRequest: + e.returnValue, _ = s.processRequestVoteRequest(req) + case *SnapshotRecoveryRequest: + e.returnValue = s.processSnapshotRecoveryRequest(req) + } } - // Callback to event. e.c <- err @@ -959,31 +925,29 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot //-------------------------------------- // Adds a peer to the server. -func (s *Server) AddPeer(name string) error { +func (s *Server) AddPeer(name string, connectiongString string) error { s.debugln("server.peer.add: ", name, len(s.peers)) - + defer s.writeConf() // Do not allow peers to be added twice. if s.peers[name] != nil { return nil } - // Only add the peer if it doesn't have the same name. - if s.name != name { - // when loading snapshot s.confFile should be nil - if s.confFile != nil { - _, err := fmt.Fprintln(s.confFile, name) - s.debugln("server.peer.conf.write: ", name) - if err != nil { - return err - } - } - peer := newPeer(s, name, s.heartbeatTimeout) - if s.State() == Leader { - peer.startHeartbeat() - } - s.peers[peer.name] = peer + // Skip the Peer if it has the same name as the Server + if s.name == name { + return nil } + peer := newPeer(s, name, connectiongString, s.heartbeatTimeout) + + if s.State() == Leader { + peer.startHeartbeat() + } + + s.peers[peer.Name] = peer + + s.debugln("server.peer.conf.write: ", name) + return nil } @@ -991,8 +955,12 @@ func (s *Server) AddPeer(name string) error { func (s *Server) RemovePeer(name string) error { s.debugln("server.peer.remove: ", name, len(s.peers)) - // Ignore removal of the server itself. - if s.name == name { + defer s.writeConf() + + if name == s.Name() { + // when the removed node restart, it should be able + // to know it has been removed before. So we need + // to update knownCommitIndex return nil } // Return error if peer doesn't exist. @@ -1001,23 +969,13 @@ func (s *Server) RemovePeer(name string) error { return fmt.Errorf("raft: Peer not found: %s", name) } - // TODO: Flush entries to the peer first. - // Stop peer and remove it. - peer.stopHeartbeat() + if s.State() == Leader { + peer.stopHeartbeat(true) + } delete(s.peers, name) - s.confFile.Truncate(0) - s.confFile.Seek(0, os.SEEK_SET) - - for peer := range s.peers { - _, err := fmt.Fprintln(s.confFile, peer) - if err != nil { - return err - } - } - return nil } @@ -1054,14 +1012,13 @@ func (s *Server) TakeSnapshot() error { state = []byte{0} } - var peerNames []string + var peers []*Peer for _, peer := range s.peers { - peerNames = append(peerNames, peer.Name()) + peers = append(peers, peer.clone()) } - peerNames = append(peerNames, s.Name()) - s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path} + s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path} s.saveSnapshot() @@ -1144,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S s.peers = make(map[string]*Peer) // recovery the cluster configuration - for _, peerName := range req.Peers { - s.AddPeer(peerName) + for _, peer := range req.Peers { + s.AddPeer(peer.Name, peer.ConnectionString) } //update term and index @@ -1237,8 +1194,8 @@ func (s *Server) LoadSnapshot() error { return err } - for _, peerName := range s.lastSnapshot.Peers { - s.AddPeer(peerName) + for _, peer := range s.lastSnapshot.Peers { + s.AddPeer(peer.Name, peer.ConnectionString) } s.log.startTerm = s.lastSnapshot.LastTerm @@ -1248,6 +1205,62 @@ func (s *Server) LoadSnapshot() error { return err } +//-------------------------------------- +// Config File +//-------------------------------------- + +func (s *Server) writeConf() { + + peers := make([]*Peer, len(s.peers)) + + i := 0 + for _, peer := range s.peers { + peers[i] = peer.clone() + i++ + } + + r := &Config{ + CommitIndex: s.log.commitIndex, + Peers: peers, + } + + b, _ := json.Marshal(r) + + confPath := path.Join(s.path, "conf") + tmpConfPath := path.Join(s.path, "conf.tmp") + + err := ioutil.WriteFile(tmpConfPath, b, 0600) + + if err != nil { + panic(err) + } + + os.Rename(tmpConfPath, confPath) +} + +// Read the configuration for the server. +func (s *Server) readConf() error { + confPath := path.Join(s.path, "conf") + s.debugln("readConf.open ", confPath) + + // open conf file + b, err := ioutil.ReadFile(confPath) + + if err != nil { + return nil + } + + conf := &Config{} + + if err = json.Unmarshal(b, conf); err != nil { + return err + } + + s.log.commitIndex = conf.CommitIndex + + return nil +} + //-------------------------------------- // Debugging //-------------------------------------- diff --git a/third_party/github.com/coreos/go-raft/server_test.go b/third_party/github.com/coreos/go-raft/server_test.go index 0410846a2..2a1559970 100644 --- a/third_party/github.com/coreos/go-raft/server_test.go +++ b/third_party/github.com/coreos/go-raft/server_test.go @@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) { lookup := map[string]*Server{} transporter := &testTransporter{} transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { - return lookup[peer.Name()].RequestVote(req) + return lookup[peer.Name].RequestVote(req) } transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { - return lookup[peer.Name()].AppendEntries(req) + return lookup[peer.Name].AppendEntries(req) } servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup) @@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) { } } +//-------------------------------------- +// Recovery +//-------------------------------------- + +// Ensure that a follower cannot execute a command. +func TestServerRecoverFromPreviousLogAndConf(t *testing.T) { + // Initialize the servers. + var mutex sync.RWMutex + servers := map[string]*Server{} + + transporter := &testTransporter{} + transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + mutex.RLock() + s := servers[peer.Name] + mutex.RUnlock() + return s.RequestVote(req) + } + transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + mutex.RLock() + s := servers[peer.Name] + mutex.RUnlock() + return s.AppendEntries(req) + } + + disTransporter := &testTransporter{} + disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { + return nil + } + disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { + return nil + } + + var names []string + var paths = make(map[string]string) + + n := 5 + + // add n servers + for i := 1; i <= n; i++ { + names = append(names, strconv.Itoa(i)) + } + + var leader *Server + for _, name := range names { + server := newTestServer(name, transporter) + + servers[name] = server + paths[name] = server.Path() + + if name == "1" { + leader = server + server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.Start() + time.Sleep(testHeartbeatTimeout) + } else { + server.SetElectionTimeout(testElectionTimeout) + server.SetHeartbeatTimeout(testHeartbeatTimeout) + server.Start() + time.Sleep(testHeartbeatTimeout) + } + if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil { + t.Fatalf("Unable to join server[%s]: %v", name, err) + } + + } + + // commit some commands + for i := 0; i < 10; i++ { + if _, err := leader.Do(&testCommand2{X: 1}); err != nil { + t.Fatalf("cannot commit command:", err.Error()) + } + } + + time.Sleep(2 * testHeartbeatTimeout) + + for _, name := range names { + server := servers[name] + if server.CommitIndex() != 16 { + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16) + } + server.Stop() + } + + for _, name := range names { + // with old path and disable transportation + server := newTestServerWithPath(name, disTransporter, paths[name]) + servers[name] = server + + server.Start() + + // should only commit to the last join command + if server.CommitIndex() != 6 { + t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6) + } + + // peer conf should be recovered + if len(server.Peers()) != 4 { + t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4) + } + } + + // let nodes talk to each other + for _, name := range names { + servers[name].SetTransporter(transporter) + } + + time.Sleep(2 * testElectionTimeout) + + // should commit to the previous index + 1(nop command when new leader elected) + for _, name := range names { + server := servers[name] + if server.CommitIndex() != 17 { + t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16) + } + server.Stop() + } +} + //-------------------------------------- // Membership //-------------------------------------- @@ -357,13 +475,13 @@ func TestServerMultiNode(t *testing.T) { transporter := &testTransporter{} transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse { mutex.RLock() - s := servers[peer.name] + s := servers[peer.Name] mutex.RUnlock() return s.RequestVote(req) } transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse { mutex.RLock() - s := servers[peer.name] + s := servers[peer.Name] mutex.RUnlock() return s.AppendEntries(req) } diff --git a/third_party/github.com/coreos/go-raft/snapshot.go b/third_party/github.com/coreos/go-raft/snapshot.go index d35474f8a..fd41c08f0 100644 --- a/third_party/github.com/coreos/go-raft/snapshot.go +++ b/third_party/github.com/coreos/go-raft/snapshot.go @@ -21,9 +21,9 @@ type Snapshot struct { LastIndex uint64 `json:"lastIndex"` LastTerm uint64 `json:"lastTerm"` // cluster configuration. - Peers []string `json: "peers"` - State []byte `json: "state"` - Path string `json: "path"` + Peers []*Peer `json: "peers"` + State []byte `json: "state"` + Path string `json: "path"` } // Save the snapshot to a file diff --git a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go index e6a0efe8e..57b3e3a88 100644 --- a/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go +++ b/third_party/github.com/coreos/go-raft/snapshot_recovery_request.go @@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct { LeaderName string LastIndex uint64 LastTerm uint64 - Peers []string + Peers []*Peer State []byte } @@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot // Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes // written and any error that may have occurred. func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) { + + protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers)) + + for i, peer := range req.Peers { + protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{ + Name: proto.String(peer.Name), + ConnectionString: proto.String(peer.ConnectionString), + } + } + pb := &protobuf.ProtoSnapshotRecoveryRequest{ LeaderName: proto.String(req.LeaderName), LastIndex: proto.Uint64(req.LastIndex), LastTerm: proto.Uint64(req.LastTerm), - Peers: req.Peers, + Peers: protoPeers, State: req.State, } p, err := proto.Marshal(pb) @@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) { totalBytes := len(data) - pb := &protobuf.ProtoSnapshotRequest{} + pb := &protobuf.ProtoSnapshotRecoveryRequest{} if err = proto.Unmarshal(data, pb); err != nil { return -1, err } @@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) { req.LeaderName = pb.GetLeaderName() req.LastIndex = pb.GetLastIndex() req.LastTerm = pb.GetLastTerm() - req.Peers = req.Peers req.State = req.State + req.Peers = make([]*Peer, len(pb.Peers)) + + for i, peer := range pb.Peers { + req.Peers[i] = &Peer{ + Name: peer.GetName(), + ConnectionString: peer.GetConnectionString(), + } + } + return totalBytes, nil } diff --git a/third_party/github.com/coreos/go-raft/test.go b/third_party/github.com/coreos/go-raft/test.go index 606594bf7..025cf0f58 100644 --- a/third_party/github.com/coreos/go-raft/test.go +++ b/third_party/github.com/coreos/go-raft/test.go @@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server { return server } +func newTestServerWithPath(name string, transporter Transporter, p string) *Server { + server, _ := NewServer(name, p, transporter, nil, nil) + return server +} + func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server { server := newTestServer(name, transporter) f, err := os.Create(server.LogPath()) @@ -100,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]* server.SetHeartbeatTimeout(testHeartbeatTimeout) server.Start() for _, peer := range servers { - server.AddPeer(peer.Name()) + server.AddPeer(peer.Name(), "") } } return servers