tinyraft/tinyraft.js

318 lines
9.9 KiB
JavaScript

// TinyRaft - Raft leader election isolated from the rest of the algorithm
//
// Can be used as a building block for the standard Raft with a K/V database
// and log-based replication or for other variations of "weak consensus".
//
// Leader is not required to replicate logs - he just has to support a recovery
// procedure making all replicas consistent. For example, he can use a CRDT or
// perform erasure coding on entries. He may use term number as logical time.
// The only requirement is to guarantee preservation of entries confirmed by
// all hosts participating in consensus.
//
// Supports leader expiration like in NuRaft:
// https://github.com/eBay/NuRaft/blob/master/docs/leadership_expiration.md
//
// Also supports leader priorities, similar to NuRaft but even simpler:
// If a node receives a VoteRequest message with larger term but with smaller
// priority than its own, it immediately starts a new voting round.
// It guarantees that a node with non-maximum priority can't become leader
// without being re-elected.
// If all priorities are equal (or just zero), the election algorithm
// becomes identical to the basic algorithm without priorities.
const EventEmitter = require('events');
const VOTE_REQUEST = 'vote_request';
const VOTE = 'vote';
const PING = 'ping';
const PONG = 'pong';
const CANDIDATE = 'candidate';
const LEADER = 'leader';
const FOLLOWER = 'follower';
class TinyRaft extends EventEmitter
{
// config: {
// nodes: (number|string)[],
// nodeId: number|string,
// electionTimeout?: number,
// heartbeatTimeout?: number,
// leadershipTimeout?: number,
// initialTerm?: number,
// leaderPriority?: number,
// }
constructor(config)
{
super();
this.nodes = config.nodes;
this.nodeId = config.nodeId;
this.send = config.send;
this.electionTimeout = Number(config.electionTimeout) || 5000;
this.randomTimeout = config.randomTimeout > 0 ? Number(config.randomTimeout) : this.electionTimeout;
this.heartbeatTimeout = Number(config.heartbeatTimeout) || 1000;
this.leadershipTimeout = Number(config.leadershipTimeout) || 0;
this.leaderPriority = Number(config.leaderPriority) || undefined;
if (!this.nodeId || this.nodeId instanceof Object ||
!(this.nodes instanceof Array) || this.nodes.filter(n => !n || n instanceof Object).length > 0 ||
!(this.send instanceof Function))
throw new Error('nodeId, nodes, send are required fields of `config`');
this.term = Number(config.initialTerm) || 0;
if (this.term < 0)
this.term = 0;
this.state = null;
this.leader = null;
}
_nextTerm(after)
{
if (this.electionTimer)
{
clearTimeout(this.electionTimer);
this.electionTimer = null;
}
if (after >= 0)
{
this.electionTimer = setTimeout(() => this.start(), after + this.randomTimeout * Math.random());
}
}
start()
{
this._nextTerm(-1);
this.electionTimer = null;
this.term++;
this.voted = 1;
this.votes = { [this.nodeId]: [ this.nodeId ] };
this.state = CANDIDATE;
this.followers = null;
this.leader = this.nodeId;
if (!this.heartbeatTimer)
{
this.heartbeatTimer = setInterval(() => this._heartbeat(), this.heartbeatTimeout);
}
for (const node of this.nodes)
{
if (node != this.nodeId)
{
this.send(node, { type: VOTE_REQUEST, term: this.term, leader: this.leader, priority: this.leaderPriority });
}
}
// Next term will start right after this one times out
this._nextTerm(this.electionTimeout);
this.emit('change', { state: this.state, term: this.term });
}
stop()
{
if (this.electionTimer)
{
clearTimeout(this.electionTimer);
this.electionTimer = null;
}
if (this.heartbeatTimer)
{
clearTimeout(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
_heartbeat()
{
if (this.state == LEADER)
{
for (const node of this.followers)
{
if (node != this.nodeId)
{
this.send(node, { type: PING, term: this.term, priority: this.leaderPriority });
}
}
}
}
onReceive(from, msg)
{
if (msg.type == VOTE_REQUEST)
{
this._onReceiveVoteRequest(from, msg);
}
else if (msg.type == VOTE)
{
this._onReceiveVote(from, msg);
}
else if (msg.type == PING)
{
this._onReceivePing(from, msg);
}
else if (msg.type == PONG)
{
this._onReceivePong(from, msg);
}
}
_onReceiveVoteRequest(from, msg)
{
if (msg.term > this.term && msg.leader)
{
if (this.leaderPriority && (msg.priority||0) < this.leaderPriority)
{
this.term = msg.term;
this.start();
}
else
{
this.leader = msg.leader;
this.term = msg.term;
this.state = CANDIDATE;
this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout);
this.emit('change', { state: this.state, term: this.term, leader: this.leader });
}
}
const prio = (this.leader == this.nodeId ? this.leaderPriority : undefined);
this.send(from, { type: VOTE, term: this.term, leader: this.leader, leaderPriority: prio });
}
_onReceiveVote(from, msg)
{
if (!msg.leader || msg.term < this.term)
{
return;
}
if (msg.term > this.term)
{
this.term = msg.term;
this.leader = msg.leader;
// Repeat VOTE to the leader to join it
this.send(this.leader, { type: VOTE, term: this.term, leader: this.leader, priority: msg.priority });
}
// add <from> as voter for <msg.leader>
this.votes[msg.leader] = this.votes[msg.leader] || [];
let found = false;
for (const voter of this.votes[msg.leader])
{
if (voter == from)
{
found = true;
break;
}
}
if (!found)
{
this.voted++;
this.votes[msg.leader].push(from);
}
const n = this.votes[msg.leader].length;
if (n == 1 + (0 | this.nodes.length/2))
{
if (msg.leader == this.nodeId)
{
this.leader = msg.leader;
this.state = LEADER;
this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1);
this.followers = this.votes[this.nodeId];
// Send a heartbeat to confirm leadership
this._heartbeat();
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else
{
this._nextTerm(0);
}
}
else if (n > this.nodes.length/2 && this.state == LEADER && msg.leader == this.nodeId)
{
this.followers = this.votes[this.nodeId];
// Send a heartbeat to confirm leadership
this.send(from, { type: PING, term: this.term, priority: this.leaderPriority });
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else if (this._isVotingFailed())
{
this._nextTerm(0);
}
}
_onReceivePing(from, msg)
{
if (this.state == CANDIDATE && this.term == msg.term && from == this.leader)
{
this.state = FOLLOWER;
this.emit('change', { state: this.state, term: this.term, leader: this.leader });
}
if (this.state == FOLLOWER && from == this.leader)
{
this.markAlive();
}
if (this.leadershipTimeout > 0)
{
this.send(from, { type: PONG, term: this.term, leader: this.leader });
}
}
_onReceivePong(from, msg)
{
if (this.state != LEADER)
{
return;
}
if (msg.leader != this.nodeId)
{
this.start();
}
else
{
this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1);
}
}
markAlive()
{
this.leaderTimedOut = false;
this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout);
}
_isVotingFailed()
{
for (const id in this.votes)
{
if (this.votes[id] + this.nodes.length - this.voted > this.nodes.length/2)
{
return false;
}
}
return true;
}
setNodes(nodes)
{
if (!nodes.length)
{
nodes = [ this.nodeId ];
}
this.nodes = nodes;
if (this.state == LEADER)
{
this.votes[this.nodeId] = this.votes[this.nodeId].filter(n => nodes.indexOf(n) >= 0);
if (this.votes[this.nodeId].length < (1 + (0 | this.nodes.length/2)))
this.start();
else
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else if (this.state == FOLLOWER && nodes.indexOf(this.leader) < 0 || this.state == CANDIDATE)
{
this.start();
}
}
}
TinyRaft.VOTE_REQUEST = VOTE_REQUEST;
TinyRaft.VOTE = VOTE;
TinyRaft.PING = PING;
TinyRaft.PONG = PONG;
TinyRaft.CANDIDATE = CANDIDATE;
TinyRaft.LEADER = LEADER;
TinyRaft.FOLLOWER = FOLLOWER;
module.exports = TinyRaft;