const TinyRaft = require('./tinyraft.js'); function newNode(id, nodes, partitions, mod) { partitions = partitions || {}; let cfg = { nodes: [ 1, 2, 3, 4, 5 ], nodeId: id, heartbeatTimeout: 100, electionTimeout: 500, send: function(to, msg) { if (!partitions[n.nodeId+'-'+to] && !partitions[to+'-'+n.nodeId] && nodes[to]) { console.log('received from '+n.nodeId+' to '+to+': '+JSON.stringify(msg)); setImmediate(function() { nodes[to].onReceive(n.nodeId, msg); }); } }, }; if (mod) mod(cfg); let n = new TinyRaft(cfg); n.on('change', (st) => { console.log( 'node '+n.nodeId+': '+(st.state == TinyRaft.FOLLOWER ? 'following '+st.leader : st.state)+ ', term '+st.term+(st.state == TinyRaft.LEADER ? ', followers: '+st.followers.join(', ') : '') ); }); nodes[id] = n; } function newNodes(count, partitions, mod) { partitions = partitions || {}; const nodes = {}; for (let i = 1; i <= count; i++) { newNode(i, nodes, partitions, mod); } for (let i = 1; i <= count; i++) { nodes[i].start(); } return nodes; } function checkQuorum(nodes, count) { let leaders = 0; for (const i in nodes) { if (nodes[i].state == TinyRaft.LEADER) { leaders++; if (nodes[i].followers.length != count) throw new Error('leader '+i+' has '+nodes[i].followers.length+' followers, but should have '+count); } else if (nodes[i].state != TinyRaft.FOLLOWER) { throw new Error('node '+i+' is not in quorum: state '+nodes[i].state); } } if (leaders != 1) { throw new Error('we have '+leaders+' leaders instead of exactly 1'); } console.log('OK: '+count+' nodes in quorum'); } function checkNoQuorum(nodes) { const nc = Object.values(nodes).filter(n => n.state != TinyRaft.CANDIDATE); if (nc.length > 0) { throw new Error('we have non-candidates ('+nc.map(n => n.nodeId).join(', ')+'), but we should not'); } console.log('OK: '+Object.keys(nodes).length+' candidates, no quorum'); } async function testStartThenRemoveNode() { console.log('testStartThenRemoveNode'); const nodes = newNodes(5); let leaderChanges = 0, prevLeader = null; nodes[2].on('change', (st) => { if (st.leader && st.leader != prevLeader) { prevLeader = st.leader; leaderChanges++; } }); await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 5); if (leaderChanges >= 3) { throw new Error('More than 3 leader changes in 2000ms: '+leaderChanges); } // Stop the leader let leader = nodes[1].leader; nodes[leader].onReceive = () => {}; nodes[leader].stop(); delete nodes[leader]; // Check quorum after 2000ms await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 4); // Stop the leader again leader = nodes[Object.keys(nodes)[0]].leader; nodes[leader].onReceive = () => {}; nodes[leader].stop(); delete nodes[leader]; // Check quorum after 2000ms await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 3); // Stop the leader again leader = nodes[Object.keys(nodes)[0]].leader; nodes[leader].onReceive = () => {}; nodes[leader].stop(); delete nodes[leader]; // Check that no quorum exists await new Promise(ok => setTimeout(ok, 2000)); checkNoQuorum(nodes); // Clean up for (const id in nodes) { nodes[id].stop(); } console.log('testStartThenRemoveNode: OK'); } async function testAddNode() { console.log('testAddNode'); const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000); await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 5); // Add node newNode(6, nodes); for (let i = 1; i <= 5; i++) nodes[i].setNodes([ 1, 2, 3, 4, 5, 6 ]); nodes[6].start(); // Check quorum after 2000ms await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 6); // Clean up for (const id in nodes) { nodes[id].stop(); } console.log('testAddNode: OK'); } async function testLeadershipExpiration() { const partitions = {}; const nodes = newNodes(5, partitions, cfg => cfg.leadershipTimeout = 1500); // Check that 5 nodes are in quorum after 2000ms await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 5); // Break network on the leader let leader = nodes[1].leader; console.log("stopping the leader's ("+leader+") network"); for (let i = 1; i <= 5; i++) { partitions[i+'-'+leader] = true; partitions[leader+'-'+i] = true; } // Check that the leader loses leadership after 2 * leadershipTimeout await new Promise(ok => setTimeout(ok, 3000)); if (nodes[leader].state != TinyRaft.CANDIDATE) { throw new Error("leadership expiration doesn't work"); } // Clean up for (const id in nodes) { nodes[id].stop(); } console.log('testLeadershipExpiration: OK'); } async function testRestart() { const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000); let leaderChanges = 0, prevLeader = null; nodes[2].on('change', (st) => { const leader = st.state == TinyRaft.CANDIDATE ? null : st.leader; if (leader != prevLeader) { prevLeader = leader; leaderChanges++; } }); // Check that 5 nodes are in quorum after 2000ms await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 5); if (leaderChanges >= 3) { throw new Error("leaderChanges = "+leaderChanges+" (expected < 3)") } // Stop a follower let restarted = 1 + (prevLeader % 5); if (restarted == 2) { restarted = 1 + (prevLeader + 1) % 5; } console.log("stopping a follower (node "+restarted+")"); nodes[restarted].stop(); delete nodes[restarted]; // Wait 2000ms await new Promise(ok => setTimeout(ok, 2000)); // Restart a follower console.log("restarting a follower (node "+restarted+")"); leaderChanges = 0; newNode(restarted, nodes, {}, null); nodes[restarted].start(); // Check quorum and the fact that the leader didn't change after 2000ms await new Promise(ok => setTimeout(ok, 2000)); checkQuorum(nodes, 5); if (leaderChanges > 0) { throw new Error("leader changed after restart of a follower"); } // Clean up for (const id in nodes) { nodes[id].stop(); } console.log('testRestart: OK'); } async function run() { await testStartThenRemoveNode(); await testAddNode(); await testLeadershipExpiration(); await testRestart(); process.exit(0); } run().catch(err => { console.error(err); process.exit(1); });