bump(github.com/coreos/go-raft): bb7f7ec92e4cb6d98241cea83f55d0e85e624189

release-0.4
Brandon Philips 2013-08-18 19:43:13 -07:00
parent 393ed439b1
commit 449cad4658
13 changed files with 461 additions and 226 deletions

View File

@ -1,19 +1,49 @@
[![Build Status](https://travis-ci.org/benbjohnson/go-raft.png?branch=master)](https://travis-ci.org/benbjohnson/go-raft)
go-raft
=======
[![Build Status](https://travis-ci.org/goraft/raft.png?branch=master)](https://travis-ci.org/goraft/raft)
## Overview
This is an Go implementation of the Raft distributed consensus protocol.
This is a Go implementation of the Raft distributed consensus protocol.
Raft is a protocol by which a cluster of nodes can maintain a replicated state machine.
The state machine is kept in sync through the use of a replicated log.
For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout.
For more details on Raft, you can read [In Search of an Understandable Consensus Algorithm][raft-paper] by Diego Ongaro and John Ousterhout.
## Project Status
This library is feature complete but should be considered experimental until it has seen more usage.
If you have any questions on implementing go-raft in your project please file an issue.
There is an [active community][community] of developers who can help.
go-raft is under the MIT license.
[community]: https://github.com/goraft/raft/contributors
### Features
- Leader election
- Log replication
- Configuration changes
- Log compaction
- Unit tests
- Fast Protobuf Log Encoding
- HTTP transport
### Projects
These projects are built on go-raft:
- [coreos/etcd](https://github.com/coreos/etcd) - A highly-available key value store for shared configuration and service discovery
- [benbjohnson/raftd](https://github.com/benbjohnson/raftd) - A reference implementation for using the go-raft library for distributed consensus.
If you have a project that you're using go-raft in, please add it to this README so others can see implementation examples.
## The Raft Protocol
This section provides a summary of the Raft protocol from a high level.
For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm][raft-paper].
### Overview
Maintaining state in a single process on a single server is easy.
@ -26,7 +56,7 @@ Servers can crash or the network between two machines can become unavailable or
A distributed consensus protocol is used for maintaining a consistent state across multiple servers in a cluster.
Many distributed systems are built upon the Paxos protocol but Paxos can be difficult to understand and there are many gaps between Paxos and real world implementation.
An alternative is the [Raft distributed consensus protocol](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf) by Diego Ongaro and John Ousterhout.
An alternative is the [Raft distributed consensus protocol][raft-paper] by Diego Ongaro and John Ousterhout.
Raft is a protocol built with understandability as a primary tenant and it centers around two things:
1. Leader Election
@ -53,17 +83,9 @@ By ensuring that this log is replicated identically between all the nodes in the
Replicating the log under normal conditions is done by sending an `AppendEntries` RPC from the leader to each of the other servers in the cluster (called Peers).
Each peer will append the entries from the leader through a 2-phase commit process which ensure that a majority of servers in the cluster have entries written to log.
For a more detailed explanation on the failover process and election terms please see the full paper describing the protocol: [In Search of an Understandable Consensus Algorithm](https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf)
## History
Ben Johnson started this library for use in his behavioral analytics database called [Sky](https://github.com/skydb/sky).
He put it under the MIT license in the hopes that it would be useful for other projects too.
## Project Status
The go-raft library is feature complete but in alpha.
There is a reference implementation called [raftd](https://github.com/benbjohnson/raftd) that demonstrates how to use the library
The library will be considered experimental until it has significant production usage.
I'm writing the library for the purpose of including distributed processing in my behavioral analytics database called [Sky](https://github.com/skydb/sky).
However, I hope other projects can benefit from having a distributed consensus protocol so the go-raft library is available under MIT license.
If you have a project that you're using go-raft in, please add it to this README and send a pull request so others can see implementation examples.
If you have any questions on implementing go-raft in your project, feel free to contact me on [GitHub](https://github.com/benbjohnson), [Twitter](https://twitter.com/benbjohnson) or by e-mail at [ben@skylandlabs.com](mailto:ben@skylandlabs.com).
[raft-paper]: https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf

View File

@ -0,0 +1,7 @@
package raft
type Config struct {
CommitIndex uint64 `json:"commitIndex"`
// TODO decide what we need to store in peer struct
Peers []*Peer `json:"peers"`
}

View File

@ -94,7 +94,7 @@ func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, r
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.AppendEntriesPath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
@ -122,7 +122,7 @@ func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *Reque
return nil
}
url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
url := fmt.Sprintf("http://%s%s", peer.Name, t.RequestVotePath())
traceln(server.Name(), "POST", url)
client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}

View File

@ -9,7 +9,8 @@ type JoinCommand interface {
// Join command
type DefaultJoinCommand struct {
Name string `json:"name"`
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
}
// The name of the Join command in the log
@ -18,7 +19,7 @@ func (c *DefaultJoinCommand) CommandName() string {
}
func (c *DefaultJoinCommand) Apply(server *Server) (interface{}, error) {
err := server.AddPeer(c.Name)
err := server.AddPeer(c.Name, c.ConnectionString)
return []byte("join"), err
}

View File

@ -183,6 +183,15 @@ func (l *Log) open(path string) error {
// Append entry.
l.entries = append(l.entries, entry)
if entry.Index <= l.commitIndex {
command, err := newCommand(entry.CommandName, entry.Command)
if err != nil {
continue
}
l.ApplyFunc(command)
}
debugln("open.log.append log index ", entry.Index)
readBytes += int64(n)

View File

@ -14,7 +14,8 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
server *Server
name string
Name string `json:"name"`
ConnectionString string `json:"connectionString"`
prevLogIndex uint64
mutex sync.RWMutex
stopChan chan bool
@ -28,10 +29,11 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer {
func newPeer(server *Server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
return &Peer{
server: server,
name: name,
Name: name,
ConnectionString: connectionString,
heartbeatTimeout: heartbeatTimeout,
}
}
@ -42,11 +44,6 @@ func newPeer(server *Server, name string, heartbeatTimeout time.Duration) *Peer
//
//------------------------------------------------------------------------------
// Retrieves the name of the peer.
func (p *Peer) Name() string {
return p.name
}
// Sets the heartbeat timeout.
func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
p.heartbeatTimeout = duration
@ -89,17 +86,17 @@ func (p *Peer) startHeartbeat() {
}
// Stops the peer heartbeat.
func (p *Peer) stopHeartbeat() {
func (p *Peer) stopHeartbeat(flush bool) {
// here is a problem
// the previous stop is no buffer leader may get blocked
// when heartbeat returns at line 132
// when heartbeat returns
// I make the channel with 1 buffer
// and try to panic here
select {
case p.stopChan <- true:
case p.stopChan <- flush:
default:
panic("[" + p.server.Name() + "] cannot stop [" + p.Name() + "] heartbeat")
panic("[" + p.server.Name() + "] cannot stop [" + p.Name + "] heartbeat")
}
}
@ -113,8 +110,9 @@ func (p *Peer) clone() *Peer {
p.mutex.Lock()
defer p.mutex.Unlock()
return &Peer{
name: p.name,
prevLogIndex: p.prevLogIndex,
Name: p.Name,
ConnectionString: p.ConnectionString,
prevLogIndex: p.prevLogIndex,
}
}
@ -128,46 +126,58 @@ func (p *Peer) heartbeat(c chan bool) {
c <- true
debugln("peer.heartbeat: ", p.Name(), p.heartbeatTimeout)
debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
for {
select {
case <-stopChan:
debugln("peer.heartbeat.stop: ", p.Name())
return
case <-time.After(p.heartbeatTimeout):
debugln("peer.heartbeat.run: ", p.Name())
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
case flush := <-stopChan:
if !flush {
debugln("peer.heartbeat.stop: ", p.Name)
return
} else {
// before we can safely remove a node
// we must flush the remove command to the node first
p.flush()
debugln("peer.heartbeat.stop: ", p.Name)
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
case <-time.After(p.heartbeatTimeout):
p.flush()
}
}
}
func (p *Peer) flush() {
debugln("peer.heartbeat.run: ", p.Name)
prevLogIndex := p.getPrevLogIndex()
entries, prevLogTerm := p.server.log.getEntriesAfter(prevLogIndex, p.server.maxLogEntriesPerRequest)
if p.server.State() != Leader {
return
}
if entries != nil {
p.sendAppendEntriesRequest(newAppendEntriesRequest(p.server.currentTerm, prevLogIndex, prevLogTerm, p.server.log.CommitIndex(), p.server.name, entries))
} else {
p.sendSnapshotRequest(newSnapshotRequest(p.server.name, p.server.lastSnapshot))
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
// Sends an AppendEntries request to the peer through the transport.
func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name(), " ", len(req.Entries))
traceln("peer.flush.send: ", p.server.Name(), "->", p.Name, " ", len(req.Entries))
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name())
debugln("peer.flush.timeout: ", p.server.Name(), "->", p.Name)
return
}
traceln("peer.flush.recv: ", p.Name())
traceln("peer.flush.recv: ", p.Name)
// If successful then update the previous log index.
p.mutex.Lock()
@ -181,7 +191,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp.append = true
}
}
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name(), "; idx =", p.prevLogIndex)
traceln("peer.flush.success: ", p.server.Name(), "->", p.Name, "; idx =", p.prevLogIndex)
// If it was unsuccessful then decrement the previous log index and
// we'll try again next time.
@ -195,7 +205,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.CommitIndex
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.commitIndex: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
} else if p.prevLogIndex > 0 {
// Decrement the previous log index down until we find a match. Don't
// let it go below where the peer's commit index is though. That's a
@ -206,35 +216,35 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
p.prevLogIndex = resp.Index
}
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name(), " idx =", p.prevLogIndex)
debugln("peer.flush.decrement: ", p.server.Name(), "->", p.Name, " idx =", p.prevLogIndex)
}
}
p.mutex.Unlock()
// Attach the peer to resp, thus server can know where it comes from
resp.peer = p.Name()
resp.peer = p.Name
// Send response to server for processing.
p.server.send(resp)
}
// Sends an Snapshot request to the peer through the transport.
func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
debugln("peer.snap.send: ", p.name)
debugln("peer.snap.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRequest(p.server, p, req)
if resp == nil {
debugln("peer.snap.timeout: ", p.name)
debugln("peer.snap.timeout: ", p.Name)
return
}
debugln("peer.snap.recv: ", p.name)
debugln("peer.snap.recv: ", p.Name)
// If successful, the peer should have been to snapshot state
// Send it the snapshot!
if resp.Success {
p.sendSnapshotRecoveryRequest()
} else {
debugln("peer.snap.failed: ", p.name)
debugln("peer.snap.failed: ", p.Name)
return
}
@ -243,12 +253,12 @@ func (p *Peer) sendSnapshotRequest(req *SnapshotRequest) {
// Sends an Snapshot Recovery request to the peer through the transport.
func (p *Peer) sendSnapshotRecoveryRequest() {
req := newSnapshotRecoveryRequest(p.server.name, p.server.lastSnapshot)
debugln("peer.snap.recovery.send: ", p.name)
debugln("peer.snap.recovery.send: ", p.Name)
resp := p.server.Transporter().SendSnapshotRecoveryRequest(p.server, p, req)
if resp.Success {
p.prevLogIndex = req.LastIndex
} else {
debugln("peer.snap.recovery.failed: ", p.name)
debugln("peer.snap.recovery.failed: ", p.Name)
return
}
// Send response to server for processing.
@ -261,10 +271,10 @@ func (p *Peer) sendSnapshotRecoveryRequest() {
// send VoteRequest Request
func (p *Peer) sendVoteRequest(req *RequestVoteRequest, c chan *RequestVoteResponse) {
debugln("peer.vote: ", p.server.Name(), "->", p.Name())
debugln("peer.vote: ", p.server.Name(), "->", p.Name)
req.peer = p
if resp := p.server.Transporter().SendVoteRequest(p.server, p, req); resp != nil {
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name())
debugln("peer.vote: recv", p.server.Name(), "<-", p.Name)
resp.peer = p
c <- resp
}

View File

@ -14,12 +14,12 @@ var _ = &json.SyntaxError{}
var _ = math.Inf
type ProtoSnapshotRecoveryRequest struct {
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []string `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
LeaderName *string `protobuf:"bytes,1,req" json:"LeaderName,omitempty"`
LastIndex *uint64 `protobuf:"varint,2,req" json:"LastIndex,omitempty"`
LastTerm *uint64 `protobuf:"varint,3,req" json:"LastTerm,omitempty"`
Peers []*ProtoSnapshotRecoveryRequest_ProtoPeer `protobuf:"bytes,4,rep" json:"Peers,omitempty"`
State []byte `protobuf:"bytes,5,req" json:"State,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest) Reset() { *m = ProtoSnapshotRecoveryRequest{} }
@ -47,7 +47,7 @@ func (m *ProtoSnapshotRecoveryRequest) GetLastTerm() uint64 {
return 0
}
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []string {
func (m *ProtoSnapshotRecoveryRequest) GetPeers() []*ProtoSnapshotRecoveryRequest_ProtoPeer {
if m != nil {
return m.Peers
}
@ -61,5 +61,31 @@ func (m *ProtoSnapshotRecoveryRequest) GetState() []byte {
return nil
}
type ProtoSnapshotRecoveryRequest_ProtoPeer struct {
Name *string `protobuf:"bytes,1,req" json:"Name,omitempty"`
ConnectionString *string `protobuf:"bytes,2,req" json:"ConnectionString,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) Reset() {
*m = ProtoSnapshotRecoveryRequest_ProtoPeer{}
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) String() string { return proto.CompactTextString(m) }
func (*ProtoSnapshotRecoveryRequest_ProtoPeer) ProtoMessage() {}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetName() string {
if m != nil && m.Name != nil {
return *m.Name
}
return ""
}
func (m *ProtoSnapshotRecoveryRequest_ProtoPeer) GetConnectionString() string {
if m != nil && m.ConnectionString != nil {
return *m.ConnectionString
}
return ""
}
func init() {
}

View File

@ -3,7 +3,13 @@ package protobuf;
message ProtoSnapshotRecoveryRequest {
required string LeaderName=1;
required uint64 LastIndex=2;
required uint64 LastTerm=3;
repeated string Peers=4;
required uint64 LastTerm=3;
message ProtoPeer {
required string Name=1;
required string ConnectionString=2;
}
repeated ProtoPeer Peers=4;
required bytes State=5;
}

View File

@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"os"
"path"
@ -81,8 +80,6 @@ type Server struct {
lastSnapshot *Snapshot
stateMachine StateMachine
maxLogEntriesPerRequest uint64
confFile *os.File
}
// An event to be processed by the server's event loop.
@ -272,11 +269,15 @@ func (s *Server) QuorumSize() int {
// Retrieves the election timeout.
func (s *Server) ElectionTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.electionTimeout
}
// Sets the election timeout.
func (s *Server) SetElectionTimeout(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.electionTimeout = duration
}
@ -286,6 +287,8 @@ func (s *Server) SetElectionTimeout(duration time.Duration) {
// Retrieves the heartbeat timeout.
func (s *Server) HeartbeatTimeout() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.heartbeatTimeout
}
@ -332,14 +335,14 @@ func (s *Server) Start() error {
// Create snapshot directory if not exist
os.Mkdir(path.Join(s.path, "snapshot"), 0700)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
if err := s.readConf(); err != nil {
s.debugln("raft: Conf file error: ", err)
// Initialize the log and load it up.
if err := s.log.open(s.LogPath()); err != nil {
s.debugln("raft: Log error: ", err)
return fmt.Errorf("raft: Initialization error: %s", err)
}
@ -368,59 +371,12 @@ func (s *Server) Start() error {
return nil
}
// Read the configuration for the server.
func (s *Server) readConf() error {
var err error
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
s.confFile, err = os.OpenFile(confPath, os.O_RDWR, 0600)
if err != nil {
if os.IsNotExist(err) {
s.confFile, err = os.OpenFile(confPath, os.O_WRONLY|os.O_CREATE, 0600)
debugln("readConf.create ", confPath)
if err != nil {
return err
}
}
return err
}
peerNames := make([]string, 0)
for {
var peerName string
_, err = fmt.Fscanf(s.confFile, "%s\n", &peerName)
if err != nil {
if err == io.EOF {
s.debugln("server.peer.conf: finish")
break
}
return err
}
s.debugln("server.peer.conf.read: ", peerName)
peerNames = append(peerNames, peerName)
}
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for _, peerName := range peerNames {
s.AddPeer(peerName)
}
return nil
}
// Shuts down the server.
func (s *Server) Stop() {
s.send(&stopValue)
s.mutex.Lock()
defer s.mutex.Unlock()
s.log.close()
s.mutex.Unlock()
}
// Checks if the server is currently running.
@ -532,24 +488,27 @@ func (s *Server) followerLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(JoinCommand); ok {
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && command.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(command, e)
} else {
} else {
switch req := e.target.(type) {
case JoinCommand:
//If no log entries exist and a self-join command is issued
//then immediately become leader and commit entry.
if s.log.currentIndex() == 0 && req.NodeName() == s.Name() {
s.debugln("selfjoin and promote to leader")
s.setState(Leader)
s.processCommand(req, e)
} else {
err = NotLeaderError
}
case *AppendEntriesRequest:
e.returnValue, update = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, update = s.processRequestVoteRequest(req)
case *SnapshotRequest:
e.returnValue = s.processSnapshotRequest(req)
default:
err = NotLeaderError
}
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, update = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, update = s.processRequestVoteRequest(req)
} else if req, ok := e.target.(*SnapshotRequest); ok {
e.returnValue = s.processSnapshotRequest(req)
} else {
err = NotLeaderError
}
// Callback to event.
@ -629,14 +588,16 @@ func (s *Server) candidateLoop() {
var err error
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
e.c <- err
@ -660,7 +621,7 @@ func (s *Server) candidateLoop() {
}
}
// The event loop that is run when the server is in a Candidate state.
// The event loop that is run when the server is in a Leader state.
func (s *Server) leaderLoop() {
s.setState(Leader)
s.syncedPeer = make(map[string]bool)
@ -682,15 +643,18 @@ func (s *Server) leaderLoop() {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if command, ok := e.target.(Command); ok {
s.processCommand(command, e)
continue
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if resp, ok := e.target.(*AppendEntriesResponse); ok {
s.processAppendEntriesResponse(resp)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else {
switch req := e.target.(type) {
case Command:
s.processCommand(req, e)
continue
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *AppendEntriesResponse:
s.processAppendEntriesResponse(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
}
}
// Callback to event.
@ -705,7 +669,7 @@ func (s *Server) leaderLoop() {
// Stop all peers.
for _, peer := range s.peers {
peer.stopHeartbeat()
peer.stopHeartbeat(false)
}
s.syncedPeer = nil
}
@ -720,16 +684,18 @@ func (s *Server) snapshotLoop() {
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue, _ = s.processRequestVoteRequest(req)
} else if req, ok := e.target.(*SnapshotRecoveryRequest); ok {
e.returnValue = s.processSnapshotRecoveryRequest(req)
} else {
switch req := e.target.(type) {
case Command:
err = NotLeaderError
case *AppendEntriesRequest:
e.returnValue, _ = s.processAppendEntriesRequest(req)
case *RequestVoteRequest:
e.returnValue, _ = s.processRequestVoteRequest(req)
case *SnapshotRecoveryRequest:
e.returnValue = s.processSnapshotRecoveryRequest(req)
}
}
// Callback to event.
e.c <- err
@ -959,31 +925,29 @@ func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVot
//--------------------------------------
// Adds a peer to the server.
func (s *Server) AddPeer(name string) error {
func (s *Server) AddPeer(name string, connectiongString string) error {
s.debugln("server.peer.add: ", name, len(s.peers))
defer s.writeConf()
// Do not allow peers to be added twice.
if s.peers[name] != nil {
return nil
}
// Only add the peer if it doesn't have the same name.
if s.name != name {
// when loading snapshot s.confFile should be nil
if s.confFile != nil {
_, err := fmt.Fprintln(s.confFile, name)
s.debugln("server.peer.conf.write: ", name)
if err != nil {
return err
}
}
peer := newPeer(s, name, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.name] = peer
// Skip the Peer if it has the same name as the Server
if s.name == name {
return nil
}
peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
if s.State() == Leader {
peer.startHeartbeat()
}
s.peers[peer.Name] = peer
s.debugln("server.peer.conf.write: ", name)
return nil
}
@ -991,8 +955,12 @@ func (s *Server) AddPeer(name string) error {
func (s *Server) RemovePeer(name string) error {
s.debugln("server.peer.remove: ", name, len(s.peers))
// Ignore removal of the server itself.
if s.name == name {
defer s.writeConf()
if name == s.Name() {
// when the removed node restart, it should be able
// to know it has been removed before. So we need
// to update knownCommitIndex
return nil
}
// Return error if peer doesn't exist.
@ -1001,23 +969,13 @@ func (s *Server) RemovePeer(name string) error {
return fmt.Errorf("raft: Peer not found: %s", name)
}
// TODO: Flush entries to the peer first.
// Stop peer and remove it.
peer.stopHeartbeat()
if s.State() == Leader {
peer.stopHeartbeat(true)
}
delete(s.peers, name)
s.confFile.Truncate(0)
s.confFile.Seek(0, os.SEEK_SET)
for peer := range s.peers {
_, err := fmt.Fprintln(s.confFile, peer)
if err != nil {
return err
}
}
return nil
}
@ -1054,14 +1012,13 @@ func (s *Server) TakeSnapshot() error {
state = []byte{0}
}
var peerNames []string
var peers []*Peer
for _, peer := range s.peers {
peerNames = append(peerNames, peer.Name())
peers = append(peers, peer.clone())
}
peerNames = append(peerNames, s.Name())
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peerNames, state, path}
s.currentSnapshot = &Snapshot{lastIndex, lastTerm, peers, state, path}
s.saveSnapshot()
@ -1144,8 +1101,8 @@ func (s *Server) processSnapshotRecoveryRequest(req *SnapshotRecoveryRequest) *S
s.peers = make(map[string]*Peer)
// recovery the cluster configuration
for _, peerName := range req.Peers {
s.AddPeer(peerName)
for _, peer := range req.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
//update term and index
@ -1237,8 +1194,8 @@ func (s *Server) LoadSnapshot() error {
return err
}
for _, peerName := range s.lastSnapshot.Peers {
s.AddPeer(peerName)
for _, peer := range s.lastSnapshot.Peers {
s.AddPeer(peer.Name, peer.ConnectionString)
}
s.log.startTerm = s.lastSnapshot.LastTerm
@ -1248,6 +1205,62 @@ func (s *Server) LoadSnapshot() error {
return err
}
//--------------------------------------
// Config File
//--------------------------------------
func (s *Server) writeConf() {
peers := make([]*Peer, len(s.peers))
i := 0
for _, peer := range s.peers {
peers[i] = peer.clone()
i++
}
r := &Config{
CommitIndex: s.log.commitIndex,
Peers: peers,
}
b, _ := json.Marshal(r)
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")
err := ioutil.WriteFile(tmpConfPath, b, 0600)
if err != nil {
panic(err)
}
os.Rename(tmpConfPath, confPath)
}
// Read the configuration for the server.
func (s *Server) readConf() error {
confPath := path.Join(s.path, "conf")
s.debugln("readConf.open ", confPath)
// open conf file
b, err := ioutil.ReadFile(confPath)
if err != nil {
return nil
}
conf := &Config{}
if err = json.Unmarshal(b, conf); err != nil {
return err
}
s.log.commitIndex = conf.CommitIndex
return nil
}
//--------------------------------------
// Debugging
//--------------------------------------

View File

@ -164,10 +164,10 @@ func TestServerPromote(t *testing.T) {
lookup := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name()].RequestVote(req)
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name()].AppendEntries(req)
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
@ -316,6 +316,124 @@ func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
}
}
//--------------------------------------
// Recovery
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]*Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
var paths = make(map[string]string)
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader *Server
for _, name := range names {
server := newTestServer(name, transporter)
servers[name] = server
paths[name] = server.Path()
if name == "1" {
leader = server
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
} else {
server.SetElectionTimeout(testElectionTimeout)
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
time.Sleep(testHeartbeatTimeout)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
// commit some commands
for i := 0; i < 10; i++ {
if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
t.Fatalf("cannot commit command:", err.Error())
}
}
time.Sleep(2 * testHeartbeatTimeout)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
for _, name := range names {
// with old path and disable transportation
server := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = server
server.Start()
// should only commit to the last join command
if server.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 6)
}
// peer conf should be recovered
if len(server.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(server.Peers()), 4)
}
}
// let nodes talk to each other
for _, name := range names {
servers[name].SetTransporter(transporter)
}
time.Sleep(2 * testElectionTimeout)
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
server := servers[name]
if server.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, server.CommitIndex(), 16)
}
server.Stop()
}
}
//--------------------------------------
// Membership
//--------------------------------------
@ -357,13 +475,13 @@ func TestServerMultiNode(t *testing.T) {
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
s := servers[peer.name]
s := servers[peer.Name]
mutex.RUnlock()
return s.AppendEntries(req)
}

View File

@ -21,9 +21,9 @@ type Snapshot struct {
LastIndex uint64 `json:"lastIndex"`
LastTerm uint64 `json:"lastTerm"`
// cluster configuration.
Peers []string `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
Peers []*Peer `json: "peers"`
State []byte `json: "state"`
Path string `json: "path"`
}
// Save the snapshot to a file

View File

@ -12,7 +12,7 @@ type SnapshotRecoveryRequest struct {
LeaderName string
LastIndex uint64
LastTerm uint64
Peers []string
Peers []*Peer
State []byte
}
@ -36,11 +36,21 @@ func newSnapshotRecoveryRequest(leaderName string, snapshot *Snapshot) *Snapshot
// Encodes the SnapshotRecoveryRequest to a buffer. Returns the number of bytes
// written and any error that may have occurred.
func (req *SnapshotRecoveryRequest) encode(w io.Writer) (int, error) {
protoPeers := make([]*protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer, len(req.Peers))
for i, peer := range req.Peers {
protoPeers[i] = &protobuf.ProtoSnapshotRecoveryRequest_ProtoPeer{
Name: proto.String(peer.Name),
ConnectionString: proto.String(peer.ConnectionString),
}
}
pb := &protobuf.ProtoSnapshotRecoveryRequest{
LeaderName: proto.String(req.LeaderName),
LastIndex: proto.Uint64(req.LastIndex),
LastTerm: proto.Uint64(req.LastTerm),
Peers: req.Peers,
Peers: protoPeers,
State: req.State,
}
p, err := proto.Marshal(pb)
@ -62,7 +72,7 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
totalBytes := len(data)
pb := &protobuf.ProtoSnapshotRequest{}
pb := &protobuf.ProtoSnapshotRecoveryRequest{}
if err = proto.Unmarshal(data, pb); err != nil {
return -1, err
}
@ -70,8 +80,16 @@ func (req *SnapshotRecoveryRequest) decode(r io.Reader) (int, error) {
req.LeaderName = pb.GetLeaderName()
req.LastIndex = pb.GetLastIndex()
req.LastTerm = pb.GetLastTerm()
req.Peers = req.Peers
req.State = req.State
req.Peers = make([]*Peer, len(pb.Peers))
for i, peer := range pb.Peers {
req.Peers[i] = &Peer{
Name: peer.GetName(),
ConnectionString: peer.GetConnectionString(),
}
}
return totalBytes, nil
}

View File

@ -69,6 +69,11 @@ func newTestServer(name string, transporter Transporter) *Server {
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) *Server {
server, _ := NewServer(name, p, transporter, nil, nil)
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) *Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())
@ -100,7 +105,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]*
server.SetHeartbeatTimeout(testHeartbeatTimeout)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name())
server.AddPeer(peer.Name(), "")
}
}
return servers