Compare commits

..

No commits in common. "9674117da72521cecd82408b47d299221532467d" and "9bf5db2272b1bad1488dd27a7a31ba908920d2ff" have entirely different histories.

3 changed files with 54 additions and 126 deletions

View File

@ -1,44 +0,0 @@
// Fake timer for node.js which _guarantees_ that all asynchronous timer
// callbacks are called during a tested period
function FakeTimer(startTime)
{
this.time = Number(startTime)||0;
this.timers = [];
this.id = 0;
this.setTimeout = (func, ms) =>
{
this.timers.push({ id: ++this.id, func, at: this.time+ms });
return this.id;
};
this.setInterval = (func, ms) =>
{
this.timers.push({ id: ++this.id, func, at: this.time+ms, repeat: ms });
return this.id;
};
this.clearInterval = this.clearTimeout = (id) =>
{
this.timers = this.timers.filter(t => t.id != id);
};
this.setImmediate = (func) => this.setTimeout(func, 0);
this.runFor = async (ms) =>
{
const end = this.time+ms;
this.timers.sort((a, b) => a.at - b.at);
while (this.timers.length && this.timers[0].at <= end)
{
const { func, at, repeat } = this.timers[0];
if (!repeat)
this.timers.shift();
else
this.timers[0].at += repeat;
if (at > this.time)
this.time = at;
func();
this.timers.sort((a, b) => a.at - b.at);
}
this.time = end;
};
}
module.exports = FakeTimer;

View File

@ -62,19 +62,18 @@ class TinyRaft extends EventEmitter
this.term = 0; this.term = 0;
this.state = null; this.state = null;
this.leader = null; this.leader = null;
this.confirmed = {};
} }
_nextTerm(after) _nextTerm(after)
{ {
if (this.electionTimer) if (this.electionTimer)
{ {
TinyRaft.clearTimeout(this.electionTimer); clearTimeout(this.electionTimer);
this.electionTimer = null; this.electionTimer = null;
} }
if (after >= 0) if (after >= 0)
{ {
this.electionTimer = TinyRaft.setTimeout(() => this.start(), after + this.randomTimeout * Math.random()); this.electionTimer = setTimeout(() => this.start(), after + this.randomTimeout * Math.random());
} }
} }
@ -87,11 +86,10 @@ class TinyRaft extends EventEmitter
this.votes = { [this.nodeId]: [ this.nodeId ] }; this.votes = { [this.nodeId]: [ this.nodeId ] };
this.state = CANDIDATE; this.state = CANDIDATE;
this.followers = null; this.followers = null;
this.confirmed = {};
this.leader = this.nodeId; this.leader = this.nodeId;
if (!this.heartbeatTimer) if (!this.heartbeatTimer)
{ {
this.heartbeatTimer = TinyRaft.setInterval(() => this._heartbeat(), this.heartbeatTimeout); this.heartbeatTimer = setInterval(() => this._heartbeat(), this.heartbeatTimeout);
} }
for (const node of this.nodes) for (const node of this.nodes)
{ {
@ -109,12 +107,12 @@ class TinyRaft extends EventEmitter
{ {
if (this.electionTimer) if (this.electionTimer)
{ {
TinyRaft.clearTimeout(this.electionTimer); clearTimeout(this.electionTimer);
this.electionTimer = null; this.electionTimer = null;
} }
if (this.heartbeatTimer) if (this.heartbeatTimer)
{ {
TinyRaft.clearInterval(this.heartbeatTimer); clearTimeout(this.heartbeatTimer);
this.heartbeatTimer = null; this.heartbeatTimer = null;
} }
} }
@ -123,7 +121,6 @@ class TinyRaft extends EventEmitter
{ {
if (this.state == LEADER) if (this.state == LEADER)
{ {
this.confirmed = {};
for (const node of (this.leadershipTimeout ? this.followers : this.nodes)) for (const node of (this.leadershipTimeout ? this.followers : this.nodes))
{ {
if (node != this.nodeId) if (node != this.nodeId)
@ -168,8 +165,6 @@ class TinyRaft extends EventEmitter
this.leader = msg.leader; this.leader = msg.leader;
this.term = msg.term; this.term = msg.term;
this.state = CANDIDATE; this.state = CANDIDATE;
this.followers = null;
this.confirmed = {};
this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout); this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout);
this.emit('change', { state: this.state, term: this.term, leader: this.leader }); this.emit('change', { state: this.state, term: this.term, leader: this.leader });
} }
@ -189,7 +184,6 @@ class TinyRaft extends EventEmitter
this.term = msg.term; this.term = msg.term;
this.state = FOLLOWER; this.state = FOLLOWER;
this.followers = null; this.followers = null;
this.confirmed = {};
this.leader = msg.leader; this.leader = msg.leader;
this.votes = {}; this.votes = {};
this.emit('change', { state: this.state, term: this.term, leader: this.leader }); this.emit('change', { state: this.state, term: this.term, leader: this.leader });
@ -220,7 +214,6 @@ class TinyRaft extends EventEmitter
{ {
this.leader = msg.leader; this.leader = msg.leader;
this.state = LEADER; this.state = LEADER;
this.confirmed = {};
this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1); this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1);
this.followers = this.votes[this.nodeId]; this.followers = this.votes[this.nodeId];
// Send a heartbeat to confirm leadership // Send a heartbeat to confirm leadership
@ -256,7 +249,7 @@ class TinyRaft extends EventEmitter
{ {
this.markAlive(); this.markAlive();
} }
if (this.leadershipTimeout > 0 || this.state == FOLLOWER && from != this.leader) if (this.leadershipTimeout > 0)
{ {
this.send(from, { type: PONG, term: this.term, leader: this.leader }); this.send(from, { type: PONG, term: this.term, leader: this.leader });
} }
@ -268,18 +261,13 @@ class TinyRaft extends EventEmitter
{ {
return; return;
} }
if (msg.leader != this.nodeId || msg.term != this.term) if (msg.leader != this.nodeId)
{ {
this.start(); this.start();
} }
else if (this.leadershipTimeout > 0) else
{ {
this.confirmed[from] = true; this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1);
if (Object.keys(this.confirmed).length >= (1 + (0 | this.nodes.length/2)))
{
// We have quorum
this._nextTerm(this.leadershipTimeout);
}
} }
} }
@ -323,10 +311,6 @@ class TinyRaft extends EventEmitter
} }
} }
TinyRaft.setTimeout = setTimeout;
TinyRaft.clearTimeout = clearTimeout;
TinyRaft.setInterval = setInterval;
TinyRaft.clearInterval = clearInterval;
TinyRaft.VOTE_REQUEST = VOTE_REQUEST; TinyRaft.VOTE_REQUEST = VOTE_REQUEST;
TinyRaft.VOTE = VOTE; TinyRaft.VOTE = VOTE;
TinyRaft.PING = PING; TinyRaft.PING = PING;

View File

@ -1,11 +1,4 @@
const TinyRaft = require('./tinyraft.js'); const TinyRaft = require('./tinyraft.js');
const FakeTimer = require('./faketimer.js');
const fake = new FakeTimer();
TinyRaft.setTimeout = fake.setTimeout;
TinyRaft.clearTimeout = fake.clearTimeout;
TinyRaft.setInterval = fake.setInterval;
TinyRaft.clearInterval = fake.clearInterval;
function newNode(id, nodes, partitions, mod) function newNode(id, nodes, partitions, mod)
{ {
@ -19,8 +12,8 @@ function newNode(id, nodes, partitions, mod)
{ {
if (!partitions[n.nodeId+'-'+to] && !partitions[to+'-'+n.nodeId] && nodes[to]) if (!partitions[n.nodeId+'-'+to] && !partitions[to+'-'+n.nodeId] && nodes[to])
{ {
console.log('['+fake.time+'] received from '+n.nodeId+' to '+to+': '+JSON.stringify(msg)); console.log('received from '+n.nodeId+' to '+to+': '+JSON.stringify(msg));
fake.setImmediate(function() { nodes[to].onReceive(n.nodeId, msg); }); setImmediate(function() { nodes[to].onReceive(n.nodeId, msg); });
} }
}, },
}; };
@ -30,7 +23,7 @@ function newNode(id, nodes, partitions, mod)
n.on('change', (st) => n.on('change', (st) =>
{ {
console.log( console.log(
'['+fake.time+'] node '+n.nodeId+': '+(st.state == TinyRaft.FOLLOWER ? 'following '+st.leader : st.state)+ 'node '+n.nodeId+': '+(st.state == TinyRaft.FOLLOWER ? 'following '+st.leader : st.state)+
', term '+st.term+(st.state == TinyRaft.LEADER ? ', followers: '+st.followers.join(', ') : '') ', term '+st.term+(st.state == TinyRaft.LEADER ? ', followers: '+st.followers.join(', ') : '')
); );
}); });
@ -55,7 +48,6 @@ function newNodes(count, partitions, mod)
function checkQuorum(nodes, count) function checkQuorum(nodes, count)
{ {
let leaders = 0; let leaders = 0;
let out = 0;
for (const i in nodes) for (const i in nodes)
{ {
if (nodes[i].state == TinyRaft.LEADER) if (nodes[i].state == TinyRaft.LEADER)
@ -66,8 +58,6 @@ function checkQuorum(nodes, count)
} }
else if (nodes[i].state != TinyRaft.FOLLOWER) else if (nodes[i].state != TinyRaft.FOLLOWER)
{ {
out++;
if (out > nodes.length-count)
throw new Error('node '+i+' is not in quorum: state '+nodes[i].state); throw new Error('node '+i+' is not in quorum: state '+nodes[i].state);
} }
} }
@ -75,7 +65,7 @@ function checkQuorum(nodes, count)
{ {
throw new Error('we have '+leaders+' leaders instead of exactly 1'); throw new Error('we have '+leaders+' leaders instead of exactly 1');
} }
console.log('['+fake.time+'] OK: '+count+' nodes in quorum'); console.log('OK: '+count+' nodes in quorum');
} }
function checkNoQuorum(nodes) function checkNoQuorum(nodes)
@ -85,12 +75,11 @@ function checkNoQuorum(nodes)
{ {
throw new Error('we have non-candidates ('+nc.map(n => n.nodeId).join(', ')+'), but we should not'); throw new Error('we have non-candidates ('+nc.map(n => n.nodeId).join(', ')+'), but we should not');
} }
console.log('['+fake.time+'] OK: '+Object.keys(nodes).length+' candidates, no quorum'); console.log('OK: '+Object.keys(nodes).length+' candidates, no quorum');
} }
async function testStartThenRemoveNode() async function testStartThenRemoveNode()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testStartThenRemoveNode'); console.log('testStartThenRemoveNode');
const nodes = newNodes(5); const nodes = newNodes(5);
let leaderChanges = 0, prevLeader = null; let leaderChanges = 0, prevLeader = null;
@ -102,7 +91,7 @@ async function testStartThenRemoveNode()
leaderChanges++; leaderChanges++;
} }
}); });
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
if (leaderChanges >= 3) if (leaderChanges >= 3)
{ {
@ -114,7 +103,7 @@ async function testStartThenRemoveNode()
nodes[leader].stop(); nodes[leader].stop();
delete nodes[leader]; delete nodes[leader];
// Check quorum after 2000ms // Check quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 4); checkQuorum(nodes, 4);
// Stop the leader again // Stop the leader again
leader = nodes[Object.keys(nodes)[0]].leader; leader = nodes[Object.keys(nodes)[0]].leader;
@ -122,7 +111,7 @@ async function testStartThenRemoveNode()
nodes[leader].stop(); nodes[leader].stop();
delete nodes[leader]; delete nodes[leader];
// Check quorum after 2000ms // Check quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 3); checkQuorum(nodes, 3);
// Stop the leader again // Stop the leader again
leader = nodes[Object.keys(nodes)[0]].leader; leader = nodes[Object.keys(nodes)[0]].leader;
@ -130,7 +119,7 @@ async function testStartThenRemoveNode()
nodes[leader].stop(); nodes[leader].stop();
delete nodes[leader]; delete nodes[leader];
// Check that no quorum exists // Check that no quorum exists
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkNoQuorum(nodes); checkNoQuorum(nodes);
// Clean up // Clean up
for (const id in nodes) for (const id in nodes)
@ -142,10 +131,9 @@ async function testStartThenRemoveNode()
async function testAddNode() async function testAddNode()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testAddNode'); console.log('testAddNode');
const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000); const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000);
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
// Add node // Add node
newNode(6, nodes); newNode(6, nodes);
@ -153,7 +141,7 @@ async function testAddNode()
nodes[i].setNodes([ 1, 2, 3, 4, 5, 6 ]); nodes[i].setNodes([ 1, 2, 3, 4, 5, 6 ]);
nodes[6].start(); nodes[6].start();
// Check quorum after 2000ms // Check quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 6); checkQuorum(nodes, 6);
// Clean up // Clean up
for (const id in nodes) for (const id in nodes)
@ -165,23 +153,22 @@ async function testAddNode()
async function testLeadershipExpiration() async function testLeadershipExpiration()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testLeadershipExpiration'); console.log('testLeadershipExpiration');
const partitions = {}; const partitions = {};
const nodes = newNodes(5, partitions, cfg => cfg.leadershipTimeout = 1500); const nodes = newNodes(5, partitions, cfg => cfg.leadershipTimeout = 1500);
// Check that 5 nodes are in quorum after 2000ms // Check that 5 nodes are in quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
// Break network on the leader // Break network on the leader
let leader = nodes[1].leader; let leader = nodes[1].leader;
console.log("["+fake.time+"] --> stopping the leader's ("+leader+") network"); console.log("stopping the leader's ("+leader+") network");
for (let i = 1; i <= 5; i++) for (let i = 1; i <= 5; i++)
{ {
partitions[i+'-'+leader] = true; partitions[i+'-'+leader] = true;
partitions[leader+'-'+i] = true; partitions[leader+'-'+i] = true;
} }
// Check that the leader loses leadership after 2 * leadershipTimeout // Check that the leader loses leadership after 2 * leadershipTimeout
await fake.runFor(3000); await new Promise(ok => setTimeout(ok, 3000));
if (nodes[leader].state != TinyRaft.CANDIDATE) if (nodes[leader].state != TinyRaft.CANDIDATE)
{ {
throw new Error("leadership expiration doesn't work"); throw new Error("leadership expiration doesn't work");
@ -196,7 +183,6 @@ async function testLeadershipExpiration()
async function testRestart() async function testRestart()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testRestart'); console.log('testRestart');
const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000); const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000);
let leaderChanges = 0, prevLeader = null; let leaderChanges = 0, prevLeader = null;
@ -210,7 +196,7 @@ async function testRestart()
} }
}); });
// Check that 5 nodes are in quorum after 2000ms // Check that 5 nodes are in quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
if (leaderChanges >= 3) if (leaderChanges >= 3)
{ {
@ -222,18 +208,18 @@ async function testRestart()
{ {
restarted = 1 + (prevLeader + 1) % 5; restarted = 1 + (prevLeader + 1) % 5;
} }
console.log("["+fake.time+"] --> stopping a follower (node "+restarted+")"); console.log("stopping a follower (node "+restarted+")");
nodes[restarted].stop(); nodes[restarted].stop();
delete nodes[restarted]; delete nodes[restarted];
// Wait 2000ms // Wait 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
// Restart a follower // Restart a follower
console.log("["+fake.time+"] --> restarting a follower (node "+restarted+")"); console.log("restarting a follower (node "+restarted+")");
leaderChanges = 0; leaderChanges = 0;
newNode(restarted, nodes, {}, null); newNode(restarted, nodes, {}, null);
nodes[restarted].start(); nodes[restarted].start();
// Check quorum and the fact that the leader didn't change after 2000ms // Check quorum and the fact that the leader didn't change after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
if (leaderChanges > 0) if (leaderChanges > 0)
{ {
@ -249,21 +235,20 @@ async function testRestart()
async function testChangeNodes() async function testChangeNodes()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testChangeNodes'); console.log('testChangeNodes');
console.log('['+fake.time+'] --> starting nodes 1-5'); console.log('starting nodes 1-5');
const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000); const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000);
// Check that 5 nodes are in quorum after 2000ms // Check that 5 nodes are in quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
// Stop node 4 // Stop node 4
console.log('['+fake.time+'] --> stopping node 4'); console.log('stopping node 4');
nodes[4].stop(); nodes[4].stop();
delete nodes[4]; delete nodes[4];
// Wait 1000ms // Wait 1000ms
await fake.runFor(1000); await new Promise(ok => setTimeout(ok, 1000));
// Change nodes from 1 2 3 4 5 to 1 2 3 5 6 // Change nodes from 1 2 3 4 5 to 1 2 3 5 6
console.log('['+fake.time+'] --> starting node 6'); console.log('starting node 6');
newNode(6, nodes); newNode(6, nodes);
nodes[6].start(); nodes[6].start();
nodes[1].setNodes([ 1, 2, 3, 5, 6 ]); nodes[1].setNodes([ 1, 2, 3, 5, 6 ]);
@ -272,7 +257,7 @@ async function testChangeNodes()
nodes[5].setNodes([ 1, 2, 3, 5, 6 ]); nodes[5].setNodes([ 1, 2, 3, 5, 6 ]);
nodes[6].setNodes([ 1, 2, 3, 5, 6 ]); nodes[6].setNodes([ 1, 2, 3, 5, 6 ]);
// Check that 5 nodes are in quorum after 2000ms // Check that 5 nodes are in quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
// Clean up // Clean up
for (const id in nodes) for (const id in nodes)
@ -285,34 +270,33 @@ async function testChangeNodes()
async function testLeaderPriority() async function testLeaderPriority()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testLeaderPriority'); console.log('testLeaderPriority');
console.log('['+fake.time+'] --> starting nodes 1-5'); console.log('starting nodes 1-5');
const nodes = newNodes(5, {}, cfg => cfg.leaderPriority = cfg.nodeId+1); const nodes = newNodes(5, {}, cfg => cfg.leaderPriority = cfg.nodeId+1);
// Check that 5 nodes are in quorum after 2000ms // Check that 5 nodes are in quorum after 2000ms
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 5); checkQuorum(nodes, 5);
if (nodes[1].leader != 5) if (nodes[1].leader != 5)
{ {
throw new Error('leader is not 5'); throw new Error('leader is not 5');
} }
// Stop node 5 // Stop node 5
console.log('['+fake.time+'] --> stopping node 5'); console.log('stopping node 5');
nodes[5].stop(); nodes[5].stop();
delete nodes[5]; delete nodes[5];
// Wait 2000ms and check that the leader is now 4 // Wait 2000ms and check that the leader is now 4
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 4); checkQuorum(nodes, 4);
if (nodes[1].leader != 4) if (nodes[1].leader != 4)
{ {
throw new Error('leader is not 4'); throw new Error('leader is not 4');
} }
// Stop node 4 // Stop node 4
console.log('['+fake.time+'] --> stopping node 4'); console.log('stopping node 4');
nodes[4].stop(); nodes[4].stop();
delete nodes[4]; delete nodes[4];
// Wait 2000ms and check that the leader is now 3 // Wait 2000ms and check that the leader is now 3
await fake.runFor(2000); await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 3); checkQuorum(nodes, 3);
if (nodes[1].leader != 3) if (nodes[1].leader != 3)
{ {
@ -329,7 +313,6 @@ async function testLeaderPriority()
async function testPartition1_3() async function testPartition1_3()
{ {
console.log('--------------------------------------------------------------------------------');
console.log('testPartition1_3'); console.log('testPartition1_3');
const partitions = { '1-3': true, '3-1': true }; const partitions = { '1-3': true, '3-1': true };
const nodes = newNodes(3, partitions, cfg => cfg.nodes = [ 1, 2, 3 ]); const nodes = newNodes(3, partitions, cfg => cfg.nodes = [ 1, 2, 3 ]);
@ -342,19 +325,24 @@ async function testPartition1_3()
leaderChanges++; leaderChanges++;
} }
}); });
// Check that 2 or 3 nodes are in quorum after 5000ms // Check that 3 nodes are in quorum after 2000ms
// This situation should be fixed by "prevote protocol", but it breaks other things // 2 is always elected as leader because it's the first node that can determine
await fake.runFor(5000); // that the first voting round has failed: it receives a vote for 1 from 1 and
if (nodes[2].leader == 2) // a vote for 3 from 3. So it knows that no node can get 2 votes. 1 and 3, at
// the same time, don't know it for sure because they don't receive failures
// from each other due to the network partition.
await new Promise(ok => setTimeout(ok, 2000));
checkQuorum(nodes, 3); checkQuorum(nodes, 3);
else if (leaderChanges >= 3)
checkQuorum(nodes, 2); {
throw new Error('More than 3 leader changes in 2000ms: '+leaderChanges);
}
// Clean up // Clean up
for (const id in nodes) for (const id in nodes)
{ {
nodes[id].stop(); nodes[id].stop();
} }
console.log('testPartition1_3: OK'); console.log('testLeadershipExpiration: OK');
} }
async function run() async function run()