From 3056f3ef7dee3d1ecb76f6034e6b6fe746165aa5 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 4 May 2024 02:28:56 +0300 Subject: [PATCH] WIP Raft --- antietcd.js | 277 +++++++++++++++++++++++++++++++++++++++++++++++++--- etctree.js | 99 +++++++++++-------- 2 files changed, 322 insertions(+), 54 deletions(-) diff --git a/antietcd.js b/antietcd.js index e61157a..f573e94 100644 --- a/antietcd.js +++ b/antietcd.js @@ -9,6 +9,7 @@ const zlib = require('zlib'); const ws = require('ws'); const EtcTree = require('./etctree.js'); +const TinyRaft = require('./tinyraft.js'); class RequestError { @@ -26,6 +27,9 @@ class AntiEtcd this.clients = {}; this.client_id = 1; this.etctree = new EtcTree(true); + this.cluster_connections = {}; + this.raft = null; + this.stored_term = 0; this.cfg = cfg; this.loading = false; this.stopped = false; @@ -50,17 +54,44 @@ class AntiEtcd this.loading = true; this.etctree.load(data); this.loading = false; + this.stored_term = data['term']; } else if (err.code != 'ENOENT') { throw err; } // Set exit hook - const on_stop_cb = () => this.on_stop(); + const on_stop_cb = () => this.onStop(); process.on('SIGINT', on_stop_cb); process.on('SIGTERM', on_stop_cb); process.on('SIGQUIT', on_stop_cb); } + if (this.cfg.cluster) + { + if (!this.cfg.node_id || !this.cfg.cluster_key) + { + throw new Error('node_id and cluster_key are required in configuration if cluster is set'); + } + if (!(this.cfg.cluster instanceof Object)) + { + this.cfg.cluster = (''+this.cfg.cluster).trim().split(/[\s,]*,[\s,]*/) + .reduce((a, c) => { c = c.split(/\s*=\s*/); a[c[0]] = c[1]; return a; }, {}); + } + this.raft = new TinyRaft({ + nodes: Object.keys(this.cfg.cluster), + nodeId: this.cfg.node_id, + heartbeatTimeout: this.cfg.heartbeat_timeout, + electionTimeout: this.cfg.election_timeout, + initialTerm: this.stored_term, + send: (to, msg) => this.sendRaftMessage(to, msg), + }); + this.raft.on('change', (event) => this.handleRaftChange(event)); + // Connect to all nodes and reconnect forever + for (const node_id in this.cfg.cluster) + { + this.connectToNode(node_id); + } + } if (this.cfg.cert) { this.tls = { key: await fsp.readFile(this.cfg.key), cert: await fsp.readFile(this.cfg.cert) }; @@ -71,11 +102,30 @@ class AntiEtcd this.server = http.createServer((req, res) => this.handleRequest(req, res)); } this.wss = new ws.WebSocketServer({ server: this.server }); - this.wss.on('connection', (conn, req) => this.startWebsocket(conn, req)); + this.wss.on('connection', (conn, req) => this.startWebsocket(conn, null)); this.server.listen(this.cfg.port || 2379); } - async on_stop() + connectToNode(node_id) + { + if (node_id != this.cfg.node_id && this.cfg.cluster[node_id] && + (!this.cluster_connections[node_id] || !this.clients[this.cluster_connections[node_id]])) + { + const socket = new ws.WebSocket(this.cfg.cluster[node_id].replace(/^http/, 'ws')); + const client_id = this.startWebsocket(socket, () => this.connectToNode(node_id)); + this.cluster_connections[node_id] = client_id; + socket.on('open', () => + { + if (this.clients[client_id]) + { + this.clients[client_id].ready = true; + socket.send(JSON.stringify({ identify: { key: this.cfg.cluster_key, node_id: this.cfg.node_id } })); + } + }); + } + } + + async onStop() { if (this.stopped) { @@ -157,6 +207,7 @@ class AntiEtcd try { let dump = this.etctree.dump(true); + dump['term'] = this.stored_term; dump = JSON.stringify(dump); dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res))); const fh = await fsp.open(this.cfg.filename+'.tmp', 'w'); @@ -181,11 +232,116 @@ class AntiEtcd } } - fail(res, code, text) + handleRaftChange(event) { - res.writeHead(code); - res.write(text); - res.end(); + if (event.state == TinyRaft.LEADER) + { + // (Re)sync with the new set of followers + this.resync(followers); + } + else + { + this.etctree.pause_leases(); + } + } + + resync(followers) + { + this.synced = false; + if (!this.resync_state) + { + this.resync_state = { + dumps: {}, + wait_load: 0, + }; + this.resync_state.dumps[this.cfg.node_id] = { ...this.etctree.dump(), term: this.stored_term }; + } + const seen = {}; + for (const f of followers) + { + seen[f] = true; + if (!(f in this.resync_state.dumps)) + { + const client = this.getPeer(f); + if (client) + { + client.socket.send(JSON.stringify({ dump: {} })); + } + this.resync_state.dumps[f] = null; + } + } + for (const f in this.resync_state.dumps) + { + if (!seen[f]) + { + delete this.resync_state.dumps[f]; + } + } + this.continueResync(); + } + + continueResync() + { + let max_term = -1, with_max = []; + for (const follower in this.resync_state.dumps) + { + const dump = this.resync_state.dumps[follower]; + if (!dump) + { + // Some dump(s) are still pending + return; + } + if (dump.term > max_term) + { + max_term = dump.term; + with_max = [ follower ]; + } + else if (dump.term == max_term) + { + with_max.push(follower); + } + } + if (max_term < 0 || with_max.length == 0) + { + throw new Error('BUG: no max term during resync'); + } + // Merge databases of all nodes with maximum term + // Force other nodes to replicate the merged DB, throwing away their own states + for (let i = 0; i < with_max.length; i++) + { + const update_only = (i == 0 && this.stored_term != max_term); + this.etctree.load(this.resync_state.dumps[with_max[0]], update_only); + } + const load_request = JSON.stringify({ load: this.etctree.dump() }); + for (const follower in this.resync_state.dumps) + { + if (follower != this.cfg.node_id) + { + const dump = this.resync_state.dumps[follower]; + if (dump.term < max_term) + { + const client = this.getPeer(follower); + if (client) + { + client.socket.send(load_request); + } + this.resync_state.wait_load++; + } + } + } + this.finishResync(); + } + + finishResync() + { + if (Object.values(this.resync_state.dumps).filter(d => !d).length > 0 || + this.resync_state.wait_load > 0) + { + return; + } + // All current peers have copied the database, we can proceed + this.stored_term = this.resync_state.max_term; + this.synced = true; } handleRequest(req, res) @@ -270,10 +426,11 @@ class AntiEtcd }); } - startWebsocket(socket, req) + startWebsocket(socket, reconnect) { const client_id = this.client_id++; this.clients[client_id] = { + id: client_id, socket, alive: true, watches: {}, @@ -319,7 +476,12 @@ class AntiEtcd clearInterval(pinger); delete this.clients[client_id]; socket.terminate(); + if (reconnect) + { + reconnect(); + } }); + return client_id; } async runHandler(req, data, res) @@ -340,6 +502,29 @@ class AntiEtcd if (requestUrl.pathname.substr(0, 4) == '/v3/') { const path = requestUrl.pathname.substr(4).replace(/\/+$/, '').replace(/\/+/g, '_'); + if (this.raft) + { + if (requestUrl.searchParams.get('leaderonly') && + this.raft.state != TinyRaft.LEADER) + { + throw new RequestError(503, 'Not leader'); + } + if (this.raft.state == TinyRaft.CANDIDATE) + { + throw new RequestError(503, 'Quorum not available'); + } + else if (this.raft.state == TinyRaft.FOLLOWER && + (!this.cfg.stale_read || this.isWrite(path, data))) + { + // Forward to leader + return await this.forwardToLeader(requestUrl.pathname, data) + } + else if (!this.synced) + { + // Wait for initial sync for read-only requests + await new Promise(ok => this.waitSync.push(ok)); + } + } const cb = this['handle_'+path]; if (cb) { @@ -403,16 +588,17 @@ class AntiEtcd handle_dump(data) { - return this.etctree.dump(); + return { ...this.etctree.dump(), term: this.stored_term }; } handleMessage(client_id, msg, socket) { + const client = this.clients[client_id]; if (msg.create_request) { // FIXME progress_notify, filters, prev_kv const create_request = msg.create_request; - if (!create_request.watch_id || !this.clients[client_id].watches[create_request.watch_id]) + if (!create_request.watch_id || !client.watches[create_request.watch_id]) { const watch = this.etctree.api_create_watch( { ...create_request, watch_id: null }, (msg) => socket.send(JSON.stringify(msg)) @@ -424,18 +610,18 @@ class AntiEtcd else { create_request.watch_id ||= watch.watch_id; - this.clients[client_id].watches[create_request.watch_id] = watch.watch_id; + client.watches[create_request.watch_id] = watch.watch_id; socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, created: true } })); } } } else if (msg.cancel_request) { - const mapped_id = this.clients[client_id].watches[msg.cancel_request.watch_id]; + const mapped_id = client.watches[msg.cancel_request.watch_id]; if (mapped_id) { this.etctree.api_cancel_watch({ watch_id: mapped_id }); - delete this.clients[client_id].watches[msg.cancel_request.watch_id]; + delete client.watches[msg.cancel_request.watch_id]; socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: msg.cancel_request.watch_id, canceled: true } })); } } @@ -443,6 +629,47 @@ class AntiEtcd { socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision } } })); } + if (!this.raft) + { + return; + } + if (msg.raft) + { + if (client.raft_node_id) + { + this.raft.onReceive(client.raft_node_id, msg.raft); + } + } + else if (msg.identify) + { + if (msg.identify.key === this.cfg.cluster_key && + msg.identify.node_id != this.cfg.node_id) + { + client.raft_node_id = msg.identify.node_id; + } + } + else if (msg.dump) + { + socket.send(JSON.stringify({ dump_reply: { ...this.etctree.dump(), term: this.stored_term } })); + } + else if (msg.dump_reply) + { + if (this.resync_state && client.raft_node_id && + (client.raft_node_id in this.resync_state.dumps)) + { + this.resync_state.dumps[client.raft_node_id] = msg.dump_reply; + this.continueResync(); + } + } + else if (msg.load) + { + if (client.raft_node_id) + { + this.etctree.load(msg.load); + this.stored_term = msg.load.term; + socket.send(JSON.stringify({ load_reply: { request_id: msg.load.request_id } })); + } + } } unsubscribeClient(client_id) @@ -455,6 +682,30 @@ class AntiEtcd this.etctree.api_cancel_watch({ watch_id: mapped_id }); } } + + getPeer(to) + { + const client_id = this.cluster_connections[to]; + if (!client_id) + { + return null; + } + const client = this.clients[client_id]; + if (!client || !client.ready) + { + return null; + } + return client; + } + + sendRaftMessage(to, msg) + { + const client = getPeer(to); + if (client) + { + client.socket.send(JSON.stringify({ raft: msg })); + } + } } function vitastor_persist_filter(prefix) diff --git a/etctree.js b/etctree.js index 64b3f5b..8b3b4a0 100644 --- a/etctree.js +++ b/etctree.js @@ -192,18 +192,21 @@ class EtcTree } // load snapshot of all data including leases - load(snapshot) + load(snapshot, update_only) { this.mod_revision = snapshot.mod_revision; // First apply leases - for (const id in this.leases) + if (!update_only && snapshot.leases) { - if (!snapshot.leases[id]) + for (const id in this.leases) { - this.api_revoke_lease(id); + if (!snapshot.leases[id]) + { + this.api_revoke_lease(id); + } } } - for (const id in snapshot.leases) + for (const id in snapshot.leases||{}) { if (!this.leases[id]) { @@ -231,39 +234,42 @@ class EtcTree } // Then find and apply the difference in data const notifications = []; - this._restore_diff(this.state, snapshot.state, null, this.state.watchers || [], notifications); + this._restore_diff(update_only, this.state, snapshot.state, null, this.state.watchers || [], notifications); this.notify(notifications); } - _restore_diff(cur_old, cur_new, prefix, watchers, notifications) + _restore_diff(update_only, cur_old, cur_new, prefix, watchers, notifications) { - const key = prefix === null ? '' : prefix; - if (!eq(cur_old.lease, cur_new.lease)) + if (!update_only || !cur_old.mod_revision || cur_old.mod_revision < cur_new.mod_revision) { - if (cur_old.lease && this.leases[cur_old.lease]) + const key = prefix === null ? '' : prefix; + if (!eq(cur_old.lease, cur_new.lease)) { - delete this.leases[cur_old.lease].keys[key]; - } - cur_old.lease = cur_new.lease; - if (cur_new.lease && this.leases[cur_new.lease]) - { - this.leases[cur_new.lease].keys[key] = true; - } - } - cur_old.mod_revision = cur_new.mod_revision; - cur_old.create_revision = cur_new.create_revision; - cur_old.version = cur_new.version; - if (!eq(cur_old.value, cur_new.value)) - { - cur_old.value = cur_new.value; - for (const w of (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers)) - { - const notify = { watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision }; - if (cur_old.lease) + if (cur_old.lease && this.leases[cur_old.lease]) { - notify.lease = cur_old.lease; + delete this.leases[cur_old.lease].keys[key]; + } + cur_old.lease = cur_new.lease; + if (cur_new.lease && this.leases[cur_new.lease]) + { + this.leases[cur_new.lease].keys[key] = true; + } + } + cur_old.mod_revision = cur_new.mod_revision; + cur_old.create_revision = cur_new.create_revision; + cur_old.version = cur_new.version; + if (!eq(cur_old.value, cur_new.value)) + { + cur_old.value = cur_new.value; + for (const w of (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers)) + { + const notify = { watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision }; + if (cur_old.lease) + { + notify.lease = cur_old.lease; + } + notifications.push(notify); } - notifications.push(notify); } } cur_old.children ||= {}; @@ -276,25 +282,28 @@ class EtcTree else { this._restore_diff( - cur_old.children[k], cur_new.children[k], + update_only, cur_old.children[k], cur_new.children[k], prefix === null ? k : prefix+'/'+k, cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers, notifications ); } } - for (const k in cur_old.children) + if (!update_only) { - if (!cur_new.children[k]) + for (const k in cur_old.children) { - // Delete subtree - this.delete_all( - notifications, - cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers, - cur_old.children[k], true, - prefix === null ? k : prefix+'/'+k, - this.mod_revision - ); + if (!cur_new.children[k]) + { + // Delete subtree + this.delete_all( + notifications, + cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers, + cur_old.children[k], true, + prefix === null ? k : prefix+'/'+k, + this.mod_revision + ); + } } } } @@ -302,6 +311,10 @@ class EtcTree // slave/follower nodes don't expire leases themselves, they listen for the leader instead pause_leases() { + if (this.paused) + { + return; + } this.paused = true; for (const id in this.leases) { @@ -316,6 +329,10 @@ class EtcTree resume_leases() { + if (!this.paused) + { + return; + } this.paused = false; for (const id in this.leases) {