Vitaliy Filippov 2024-05-04 02:28:56 +03:00
parent a59d8c5e5f
commit 3056f3ef7d
2 changed files with 322 additions and 54 deletions

View File

@ -9,6 +9,7 @@ const zlib = require('zlib');
const ws = require('ws');
const EtcTree = require('./etctree.js');
const TinyRaft = require('./tinyraft.js');
class RequestError
{
@ -26,6 +27,9 @@ class AntiEtcd
this.clients = {};
this.client_id = 1;
this.etctree = new EtcTree(true);
this.cluster_connections = {};
this.raft = null;
this.stored_term = 0;
this.cfg = cfg;
this.loading = false;
this.stopped = false;
@ -50,17 +54,44 @@ class AntiEtcd
this.loading = true;
this.etctree.load(data);
this.loading = false;
this.stored_term = data['term'];
}
else if (err.code != 'ENOENT')
{
throw err;
}
// Set exit hook
const on_stop_cb = () => this.on_stop();
const on_stop_cb = () => this.onStop();
process.on('SIGINT', on_stop_cb);
process.on('SIGTERM', on_stop_cb);
process.on('SIGQUIT', on_stop_cb);
}
if (this.cfg.cluster)
{
if (!this.cfg.node_id || !this.cfg.cluster_key)
{
throw new Error('node_id and cluster_key are required in configuration if cluster is set');
}
if (!(this.cfg.cluster instanceof Object))
{
this.cfg.cluster = (''+this.cfg.cluster).trim().split(/[\s,]*,[\s,]*/)
.reduce((a, c) => { c = c.split(/\s*=\s*/); a[c[0]] = c[1]; return a; }, {});
}
this.raft = new TinyRaft({
nodes: Object.keys(this.cfg.cluster),
nodeId: this.cfg.node_id,
heartbeatTimeout: this.cfg.heartbeat_timeout,
electionTimeout: this.cfg.election_timeout,
initialTerm: this.stored_term,
send: (to, msg) => this.sendRaftMessage(to, msg),
});
this.raft.on('change', (event) => this.handleRaftChange(event));
// Connect to all nodes and reconnect forever
for (const node_id in this.cfg.cluster)
{
this.connectToNode(node_id);
}
}
if (this.cfg.cert)
{
this.tls = { key: await fsp.readFile(this.cfg.key), cert: await fsp.readFile(this.cfg.cert) };
@ -71,11 +102,30 @@ class AntiEtcd
this.server = http.createServer((req, res) => this.handleRequest(req, res));
}
this.wss = new ws.WebSocketServer({ server: this.server });
this.wss.on('connection', (conn, req) => this.startWebsocket(conn, req));
this.wss.on('connection', (conn, req) => this.startWebsocket(conn, null));
this.server.listen(this.cfg.port || 2379);
}
async on_stop()
connectToNode(node_id)
{
if (node_id != this.cfg.node_id && this.cfg.cluster[node_id] &&
(!this.cluster_connections[node_id] || !this.clients[this.cluster_connections[node_id]]))
{
const socket = new ws.WebSocket(this.cfg.cluster[node_id].replace(/^http/, 'ws'));
const client_id = this.startWebsocket(socket, () => this.connectToNode(node_id));
this.cluster_connections[node_id] = client_id;
socket.on('open', () =>
{
if (this.clients[client_id])
{
this.clients[client_id].ready = true;
socket.send(JSON.stringify({ identify: { key: this.cfg.cluster_key, node_id: this.cfg.node_id } }));
}
});
}
}
async onStop()
{
if (this.stopped)
{
@ -157,6 +207,7 @@ class AntiEtcd
try
{
let dump = this.etctree.dump(true);
dump['term'] = this.stored_term;
dump = JSON.stringify(dump);
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));
const fh = await fsp.open(this.cfg.filename+'.tmp', 'w');
@ -181,11 +232,116 @@ class AntiEtcd
}
}
fail(res, code, text)
handleRaftChange(event)
{
res.writeHead(code);
res.write(text);
res.end();
if (event.state == TinyRaft.LEADER)
{
// (Re)sync with the new set of followers
this.resync(followers);
}
else
{
this.etctree.pause_leases();
}
}
resync(followers)
{
this.synced = false;
if (!this.resync_state)
{
this.resync_state = {
dumps: {},
wait_load: 0,
};
this.resync_state.dumps[this.cfg.node_id] = { ...this.etctree.dump(), term: this.stored_term };
}
const seen = {};
for (const f of followers)
{
seen[f] = true;
if (!(f in this.resync_state.dumps))
{
const client = this.getPeer(f);
if (client)
{
client.socket.send(JSON.stringify({ dump: {} }));
}
this.resync_state.dumps[f] = null;
}
}
for (const f in this.resync_state.dumps)
{
if (!seen[f])
{
delete this.resync_state.dumps[f];
}
}
this.continueResync();
}
continueResync()
{
let max_term = -1, with_max = [];
for (const follower in this.resync_state.dumps)
{
const dump = this.resync_state.dumps[follower];
if (!dump)
{
// Some dump(s) are still pending
return;
}
if (dump.term > max_term)
{
max_term = dump.term;
with_max = [ follower ];
}
else if (dump.term == max_term)
{
with_max.push(follower);
}
}
if (max_term < 0 || with_max.length == 0)
{
throw new Error('BUG: no max term during resync');
}
// Merge databases of all nodes with maximum term
// Force other nodes to replicate the merged DB, throwing away their own states
for (let i = 0; i < with_max.length; i++)
{
const update_only = (i == 0 && this.stored_term != max_term);
this.etctree.load(this.resync_state.dumps[with_max[0]], update_only);
}
const load_request = JSON.stringify({ load: this.etctree.dump() });
for (const follower in this.resync_state.dumps)
{
if (follower != this.cfg.node_id)
{
const dump = this.resync_state.dumps[follower];
if (dump.term < max_term)
{
const client = this.getPeer(follower);
if (client)
{
client.socket.send(load_request);
}
this.resync_state.wait_load++;
}
}
}
this.finishResync();
}
finishResync()
{
if (Object.values(this.resync_state.dumps).filter(d => !d).length > 0 ||
this.resync_state.wait_load > 0)
{
return;
}
// All current peers have copied the database, we can proceed
this.stored_term = this.resync_state.max_term;
this.synced = true;
}
handleRequest(req, res)
@ -270,10 +426,11 @@ class AntiEtcd
});
}
startWebsocket(socket, req)
startWebsocket(socket, reconnect)
{
const client_id = this.client_id++;
this.clients[client_id] = {
id: client_id,
socket,
alive: true,
watches: {},
@ -319,7 +476,12 @@ class AntiEtcd
clearInterval(pinger);
delete this.clients[client_id];
socket.terminate();
if (reconnect)
{
reconnect();
}
});
return client_id;
}
async runHandler(req, data, res)
@ -340,6 +502,29 @@ class AntiEtcd
if (requestUrl.pathname.substr(0, 4) == '/v3/')
{
const path = requestUrl.pathname.substr(4).replace(/\/+$/, '').replace(/\/+/g, '_');
if (this.raft)
{
if (requestUrl.searchParams.get('leaderonly') &&
this.raft.state != TinyRaft.LEADER)
{
throw new RequestError(503, 'Not leader');
}
if (this.raft.state == TinyRaft.CANDIDATE)
{
throw new RequestError(503, 'Quorum not available');
}
else if (this.raft.state == TinyRaft.FOLLOWER &&
(!this.cfg.stale_read || this.isWrite(path, data)))
{
// Forward to leader
return await this.forwardToLeader(requestUrl.pathname, data)
}
else if (!this.synced)
{
// Wait for initial sync for read-only requests
await new Promise(ok => this.waitSync.push(ok));
}
}
const cb = this['handle_'+path];
if (cb)
{
@ -403,16 +588,17 @@ class AntiEtcd
handle_dump(data)
{
return this.etctree.dump();
return { ...this.etctree.dump(), term: this.stored_term };
}
handleMessage(client_id, msg, socket)
{
const client = this.clients[client_id];
if (msg.create_request)
{
// FIXME progress_notify, filters, prev_kv
const create_request = msg.create_request;
if (!create_request.watch_id || !this.clients[client_id].watches[create_request.watch_id])
if (!create_request.watch_id || !client.watches[create_request.watch_id])
{
const watch = this.etctree.api_create_watch(
{ ...create_request, watch_id: null }, (msg) => socket.send(JSON.stringify(msg))
@ -424,18 +610,18 @@ class AntiEtcd
else
{
create_request.watch_id ||= watch.watch_id;
this.clients[client_id].watches[create_request.watch_id] = watch.watch_id;
client.watches[create_request.watch_id] = watch.watch_id;
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, created: true } }));
}
}
}
else if (msg.cancel_request)
{
const mapped_id = this.clients[client_id].watches[msg.cancel_request.watch_id];
const mapped_id = client.watches[msg.cancel_request.watch_id];
if (mapped_id)
{
this.etctree.api_cancel_watch({ watch_id: mapped_id });
delete this.clients[client_id].watches[msg.cancel_request.watch_id];
delete client.watches[msg.cancel_request.watch_id];
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: msg.cancel_request.watch_id, canceled: true } }));
}
}
@ -443,6 +629,47 @@ class AntiEtcd
{
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision } } }));
}
if (!this.raft)
{
return;
}
if (msg.raft)
{
if (client.raft_node_id)
{
this.raft.onReceive(client.raft_node_id, msg.raft);
}
}
else if (msg.identify)
{
if (msg.identify.key === this.cfg.cluster_key &&
msg.identify.node_id != this.cfg.node_id)
{
client.raft_node_id = msg.identify.node_id;
}
}
else if (msg.dump)
{
socket.send(JSON.stringify({ dump_reply: { ...this.etctree.dump(), term: this.stored_term } }));
}
else if (msg.dump_reply)
{
if (this.resync_state && client.raft_node_id &&
(client.raft_node_id in this.resync_state.dumps))
{
this.resync_state.dumps[client.raft_node_id] = msg.dump_reply;
this.continueResync();
}
}
else if (msg.load)
{
if (client.raft_node_id)
{
this.etctree.load(msg.load);
this.stored_term = msg.load.term;
socket.send(JSON.stringify({ load_reply: { request_id: msg.load.request_id } }));
}
}
}
unsubscribeClient(client_id)
@ -455,6 +682,30 @@ class AntiEtcd
this.etctree.api_cancel_watch({ watch_id: mapped_id });
}
}
getPeer(to)
{
const client_id = this.cluster_connections[to];
if (!client_id)
{
return null;
}
const client = this.clients[client_id];
if (!client || !client.ready)
{
return null;
}
return client;
}
sendRaftMessage(to, msg)
{
const client = getPeer(to);
if (client)
{
client.socket.send(JSON.stringify({ raft: msg }));
}
}
}
function vitastor_persist_filter(prefix)

View File

@ -192,18 +192,21 @@ class EtcTree
}
// load snapshot of all data including leases
load(snapshot)
load(snapshot, update_only)
{
this.mod_revision = snapshot.mod_revision;
// First apply leases
for (const id in this.leases)
if (!update_only && snapshot.leases)
{
if (!snapshot.leases[id])
for (const id in this.leases)
{
this.api_revoke_lease(id);
if (!snapshot.leases[id])
{
this.api_revoke_lease(id);
}
}
}
for (const id in snapshot.leases)
for (const id in snapshot.leases||{})
{
if (!this.leases[id])
{
@ -231,39 +234,42 @@ class EtcTree
}
// Then find and apply the difference in data
const notifications = [];
this._restore_diff(this.state, snapshot.state, null, this.state.watchers || [], notifications);
this._restore_diff(update_only, this.state, snapshot.state, null, this.state.watchers || [], notifications);
this.notify(notifications);
}
_restore_diff(cur_old, cur_new, prefix, watchers, notifications)
_restore_diff(update_only, cur_old, cur_new, prefix, watchers, notifications)
{
const key = prefix === null ? '' : prefix;
if (!eq(cur_old.lease, cur_new.lease))
if (!update_only || !cur_old.mod_revision || cur_old.mod_revision < cur_new.mod_revision)
{
if (cur_old.lease && this.leases[cur_old.lease])
const key = prefix === null ? '' : prefix;
if (!eq(cur_old.lease, cur_new.lease))
{
delete this.leases[cur_old.lease].keys[key];
}
cur_old.lease = cur_new.lease;
if (cur_new.lease && this.leases[cur_new.lease])
{
this.leases[cur_new.lease].keys[key] = true;
}
}
cur_old.mod_revision = cur_new.mod_revision;
cur_old.create_revision = cur_new.create_revision;
cur_old.version = cur_new.version;
if (!eq(cur_old.value, cur_new.value))
{
cur_old.value = cur_new.value;
for (const w of (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers))
{
const notify = { watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision };
if (cur_old.lease)
if (cur_old.lease && this.leases[cur_old.lease])
{
notify.lease = cur_old.lease;
delete this.leases[cur_old.lease].keys[key];
}
cur_old.lease = cur_new.lease;
if (cur_new.lease && this.leases[cur_new.lease])
{
this.leases[cur_new.lease].keys[key] = true;
}
}
cur_old.mod_revision = cur_new.mod_revision;
cur_old.create_revision = cur_new.create_revision;
cur_old.version = cur_new.version;
if (!eq(cur_old.value, cur_new.value))
{
cur_old.value = cur_new.value;
for (const w of (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers))
{
const notify = { watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision };
if (cur_old.lease)
{
notify.lease = cur_old.lease;
}
notifications.push(notify);
}
notifications.push(notify);
}
}
cur_old.children ||= {};
@ -276,25 +282,28 @@ class EtcTree
else
{
this._restore_diff(
cur_old.children[k], cur_new.children[k],
update_only, cur_old.children[k], cur_new.children[k],
prefix === null ? k : prefix+'/'+k,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
notifications
);
}
}
for (const k in cur_old.children)
if (!update_only)
{
if (!cur_new.children[k])
for (const k in cur_old.children)
{
// Delete subtree
this.delete_all(
notifications,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
cur_old.children[k], true,
prefix === null ? k : prefix+'/'+k,
this.mod_revision
);
if (!cur_new.children[k])
{
// Delete subtree
this.delete_all(
notifications,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
cur_old.children[k], true,
prefix === null ? k : prefix+'/'+k,
this.mod_revision
);
}
}
}
}
@ -302,6 +311,10 @@ class EtcTree
// slave/follower nodes don't expire leases themselves, they listen for the leader instead
pause_leases()
{
if (this.paused)
{
return;
}
this.paused = true;
for (const id in this.leases)
{
@ -316,6 +329,10 @@ class EtcTree
resume_leases()
{
if (!this.paused)
{
return;
}
this.paused = false;
for (const id in this.leases)
{