// TinyRaft - Raft leader election isolated from the rest of the algorithm // // Can be used as a building block for the standard Raft with a K/V database // and log-based replication or for other variations of "weak consensus". // // Leader is not required to replicate logs - he just has to support a recovery // procedure making all replicas consistent. For example, he can use a CRDT or // perform erasure coding on entries. He may use term number as logical time. // The only requirement is to guarantee preservation of entries confirmed by // all hosts participating in consensus. // // Supports leader expiration like in NuRaft: // https://github.com/eBay/NuRaft/blob/master/docs/leadership_expiration.md // // Also supports leader priorities, similar to NuRaft but even simpler: // If a node receives a VoteRequest message with larger term but with smaller // priority than its own, it immediately starts a new voting round. // It guarantees that a node with non-maximum priority can't become leader // without being re-elected. // If all priorities are equal (or just zero), the election algorithm // becomes identical to the basic algorithm without priorities. const EventEmitter = require('events'); const VOTE_REQUEST = 'vote_request'; const VOTE = 'vote'; const PING = 'ping'; const PONG = 'pong'; const CANDIDATE = 'candidate'; const LEADER = 'leader'; const FOLLOWER = 'follower'; class TinyRaft extends EventEmitter { // config: { // nodes: (number|string)[], // nodeId: number|string, // electionTimeout?: number, // heartbeatTimeout?: number, // leadershipTimeout?: number, // initialTerm?: number, // leaderPriority?: number, // } constructor(config) { super(); this.nodes = config.nodes; this.nodeId = config.nodeId; this.send = config.send; this.electionTimeout = Number(config.electionTimeout) || 5000; this.randomTimeout = config.randomTimeout > 0 ? Number(config.randomTimeout) : this.electionTimeout; this.heartbeatTimeout = Number(config.heartbeatTimeout) || 1000; this.leadershipTimeout = Number(config.leadershipTimeout) || 0; this.leaderPriority = Number(config.leaderPriority) || undefined; if (!this.nodeId || this.nodeId instanceof Object || !(this.nodes instanceof Array) || this.nodes.filter(n => !n || n instanceof Object).length > 0 || !(this.send instanceof Function)) throw new Error('nodeId, nodes, send are required fields of `config`'); this.term = Number(config.initialTerm) || 0; if (this.term < 0) this.term = 0; this.state = null; this.leader = null; } _nextTerm(after) { if (this.electionTimer) { clearTimeout(this.electionTimer); this.electionTimer = null; } if (after >= 0) { this.electionTimer = setTimeout(() => this.start(), after + this.randomTimeout * Math.random()); } } start() { this._nextTerm(-1); this.electionTimer = null; this.term++; this.voted = 1; this.votes = { [this.nodeId]: [ this.nodeId ] }; this.state = CANDIDATE; this.followers = null; this.leader = this.nodeId; if (!this.heartbeatTimer) { this.heartbeatTimer = setInterval(() => this._heartbeat(), this.heartbeatTimeout); } for (const node of this.nodes) { if (node != this.nodeId) { this.send(node, { type: VOTE_REQUEST, term: this.term, leader: this.leader, priority: this.leaderPriority }); } } // Next term will start right after this one times out this._nextTerm(this.electionTimeout); this.emit('change', { state: this.state, term: this.term }); } stop() { if (this.electionTimer) { clearTimeout(this.electionTimer); this.electionTimer = null; } if (this.heartbeatTimer) { clearTimeout(this.heartbeatTimer); this.heartbeatTimer = null; } } _heartbeat() { if (this.state == LEADER) { for (const node of (this.leadershipTimeout ? this.followers : this.nodes)) { if (node != this.nodeId) { this.send(node, { type: PING, term: this.term, priority: this.leaderPriority }); } } } } onReceive(from, msg) { if (msg.type == VOTE_REQUEST) { this._onReceiveVoteRequest(from, msg); } else if (msg.type == VOTE) { this._onReceiveVote(from, msg); } else if (msg.type == PING) { this._onReceivePing(from, msg); } else if (msg.type == PONG) { this._onReceivePong(from, msg); } } _onReceiveVoteRequest(from, msg) { if (msg.term > this.term && msg.leader) { if (this.leaderPriority && (msg.priority||0) < this.leaderPriority) { this.term = msg.term; this.start(); } else { this.leader = msg.leader; this.term = msg.term; this.state = CANDIDATE; this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout); this.emit('change', { state: this.state, term: this.term, leader: this.leader }); } } const prio = (this.leader == this.nodeId ? this.leaderPriority : undefined); this.send(from, { type: VOTE, term: this.term, leader: this.leader, leaderPriority: prio }); } _onReceiveVote(from, msg) { if (!msg.leader || msg.term < this.term) { return; } if (msg.term > this.term) { this.term = msg.term; this.state = FOLLOWER; this.followers = null; this.leader = msg.leader; this.votes = {}; this.emit('change', { state: this.state, term: this.term, leader: this.leader }); // Repeat VOTE to the leader to join it this.send(this.leader, { type: VOTE, term: this.term, leader: this.leader, priority: msg.priority }); return; } // add as voter for this.votes[msg.leader] = this.votes[msg.leader] || []; let found = false; for (const voter of this.votes[msg.leader]) { if (voter == from) { found = true; break; } } if (!found) { this.voted++; this.votes[msg.leader].push(from); } const n = this.votes[msg.leader].length; if (n == 1 + (0 | this.nodes.length/2)) { if (msg.leader == this.nodeId) { this.leader = msg.leader; this.state = LEADER; this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1); this.followers = this.votes[this.nodeId]; // Send a heartbeat to confirm leadership this._heartbeat(); this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] }); } else { this._nextTerm(0); } } else if (n > this.nodes.length/2 && this.state == LEADER && msg.leader == this.nodeId) { this.followers = this.votes[this.nodeId]; // Send a heartbeat to confirm leadership this.send(from, { type: PING, term: this.term, priority: this.leaderPriority }); this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] }); } else if (this._isVotingFailed()) { this._nextTerm(0); } } _onReceivePing(from, msg) { if (this.state == CANDIDATE && this.term == msg.term && from == this.leader) { this.state = FOLLOWER; this.emit('change', { state: this.state, term: this.term, leader: this.leader }); } if (this.state == FOLLOWER && from == this.leader) { this.markAlive(); } if (this.leadershipTimeout > 0) { this.send(from, { type: PONG, term: this.term, leader: this.leader }); } } _onReceivePong(from, msg) { if (this.state != LEADER) { return; } if (msg.leader != this.nodeId) { this.start(); } else { this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1); } } markAlive() { this.leaderTimedOut = false; this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout); } _isVotingFailed() { for (const id in this.votes) { if (this.votes[id] + this.nodes.length - this.voted > this.nodes.length/2) { return false; } } return true; } setNodes(nodes) { if (!nodes.length) { nodes = [ this.nodeId ]; } this.nodes = nodes; if (this.state == LEADER) { this.votes[this.nodeId] = this.votes[this.nodeId].filter(n => nodes.indexOf(n) >= 0); if (this.votes[this.nodeId].length < (1 + (0 | this.nodes.length/2))) this.start(); else this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] }); } else if (this.state == FOLLOWER && nodes.indexOf(this.leader) < 0 || this.state == CANDIDATE) { this.start(); } } } TinyRaft.VOTE_REQUEST = VOTE_REQUEST; TinyRaft.VOTE = VOTE; TinyRaft.PING = PING; TinyRaft.PONG = PONG; TinyRaft.CANDIDATE = CANDIDATE; TinyRaft.LEADER = LEADER; TinyRaft.FOLLOWER = FOLLOWER; module.exports = TinyRaft;