559 lines
17 KiB
JavaScript
559 lines
17 KiB
JavaScript
#!/usr/bin/node
|
|
|
|
const fsp = require('fs').promises;
|
|
const { URL } = require('url');
|
|
const http = require('http');
|
|
const https = require('https');
|
|
|
|
const ws = require('ws');
|
|
|
|
const EtcTree = require('./etctree.js');
|
|
const AntiPersistence = require('./antipersistence.js');
|
|
const AntiCluster = require('./anticluster.js');
|
|
const { runCallbacks, RequestError } = require('./common.js');
|
|
|
|
class AntiEtcd
|
|
{
|
|
constructor(cfg)
|
|
{
|
|
this.clients = {};
|
|
this.client_id = 1;
|
|
this.etctree = new EtcTree(true);
|
|
this.persistence = null;
|
|
this.cluster = null;
|
|
this.stored_term = 0;
|
|
this.cfg = cfg;
|
|
this.loading = false;
|
|
this.stopped = false;
|
|
this.inflight = 0;
|
|
this.wait_inflight = [];
|
|
}
|
|
|
|
async run()
|
|
{
|
|
if (this.cfg.data || this.cfg.cluster)
|
|
{
|
|
this.etctree.set_replicate_watcher(msg => this.persistAndReplicate(msg));
|
|
}
|
|
if (this.cfg.data)
|
|
{
|
|
this.persistence = new AntiPersistence(this);
|
|
// Load data from disk
|
|
await this.persistence.load();
|
|
// Set exit hook
|
|
const on_stop_cb = () => this.onStop();
|
|
process.on('SIGINT', on_stop_cb);
|
|
process.on('SIGTERM', on_stop_cb);
|
|
process.on('SIGQUIT', on_stop_cb);
|
|
}
|
|
if (this.cfg.cluster)
|
|
{
|
|
this.cluster = new AntiCluster(this);
|
|
}
|
|
if (this.cfg.cert)
|
|
{
|
|
this.tls = { key: await fsp.readFile(this.cfg.key), cert: await fsp.readFile(this.cfg.cert) };
|
|
this.server = https.createServer(this.tls, (req, res) => this.handleRequest(req, res));
|
|
}
|
|
else
|
|
{
|
|
this.server = http.createServer((req, res) => this.handleRequest(req, res));
|
|
}
|
|
this.wss = new ws.WebSocketServer({ server: this.server });
|
|
// eslint-disable-next-line no-unused-vars
|
|
this.wss.on('connection', (conn, req) => this.startWebsocket(conn, null));
|
|
this.server.listen(this.cfg.port || 2379);
|
|
}
|
|
|
|
async onStop()
|
|
{
|
|
if (this.stopped)
|
|
{
|
|
return;
|
|
}
|
|
this.stopped = true;
|
|
// Wait until all requests complete
|
|
while (this.inflight > 0)
|
|
{
|
|
await new Promise(ok => this.wait_inflight.push(ok));
|
|
}
|
|
if (this.persistence)
|
|
{
|
|
await this.persistence.persist();
|
|
}
|
|
process.exit(0);
|
|
}
|
|
|
|
async persistAndReplicate(msg)
|
|
{
|
|
let res = [];
|
|
if (this.cluster)
|
|
{
|
|
// We have to guarantee that replication is processed sequentially
|
|
// So we have to send messages without first awaiting for anything!
|
|
res.push(this.cluster.replicateChange(msg));
|
|
}
|
|
if (this.persistence)
|
|
{
|
|
res.push(this.persistence.persistChange(msg));
|
|
}
|
|
if (res.length)
|
|
{
|
|
res = await Promise.allSettled(res);
|
|
const errors = res.filter(r => r.status == 'rejected');
|
|
if (errors.length)
|
|
{
|
|
for (const e of errors)
|
|
{
|
|
console.error(e.reason);
|
|
}
|
|
throw errors[0].reason;
|
|
}
|
|
}
|
|
}
|
|
|
|
handleRequest(req, res)
|
|
{
|
|
let data = [];
|
|
req.on('data', (chunk) => data.push(chunk));
|
|
req.on('end', async () =>
|
|
{
|
|
this.inflight++;
|
|
data = Buffer.concat(data);
|
|
let body = '';
|
|
let code = 200;
|
|
let ctype = 'text/plain; charset=utf-8';
|
|
let reply;
|
|
try
|
|
{
|
|
if (req.headers['content-type'] != 'application/json')
|
|
{
|
|
throw new RequestError(400, 'content-type should be application/json');
|
|
}
|
|
body = data.toString();
|
|
try
|
|
{
|
|
data = data.length ? JSON.parse(data) : {};
|
|
}
|
|
catch (e)
|
|
{
|
|
throw new RequestError(400, 'body should be valid JSON');
|
|
}
|
|
if (!(data instanceof Object) || data instanceof Array)
|
|
{
|
|
throw new RequestError(400, 'body should be JSON object');
|
|
}
|
|
reply = await this.runHandler(req, data, res);
|
|
reply = JSON.stringify(reply);
|
|
ctype = 'application/json';
|
|
}
|
|
catch (e)
|
|
{
|
|
if (e instanceof RequestError)
|
|
{
|
|
code = e.code;
|
|
reply = e.message;
|
|
}
|
|
else
|
|
{
|
|
console.error(e);
|
|
code = 500;
|
|
reply = 'Internal error: '+e.message;
|
|
}
|
|
}
|
|
try
|
|
{
|
|
// Access log
|
|
console.log(
|
|
new Date().toISOString()+
|
|
' '+(req.headers['x-forwarded-for'] || (req.socket.remoteAddress + ':' + req.socket.remotePort))+
|
|
' '+req.method+' '+req.url+' '+code+'\n '+body.replace(/\n/g, '\\n')+
|
|
'\n '+reply.replace(/\n/g, '\\n')
|
|
);
|
|
reply = Buffer.from(reply);
|
|
res.writeHead(code, {
|
|
'Content-Type': ctype,
|
|
'Content-Length': reply.length,
|
|
});
|
|
res.write(reply);
|
|
res.end();
|
|
}
|
|
catch (e)
|
|
{
|
|
console.error(e);
|
|
}
|
|
this.inflight--;
|
|
if (!this.inflight)
|
|
{
|
|
runCallbacks(this, 'wait_inflight', []);
|
|
}
|
|
});
|
|
}
|
|
|
|
startWebsocket(socket, reconnect)
|
|
{
|
|
const client_id = this.client_id++;
|
|
this.clients[client_id] = {
|
|
id: client_id,
|
|
addr: socket._socket ? socket._socket.remoteAddress+':'+socket._socket.remotePort : '',
|
|
socket,
|
|
alive: true,
|
|
watches: {},
|
|
};
|
|
socket.on('pong', () => this.clients[client_id].alive = true);
|
|
socket.on('error', e => console.error(e.syscall === 'connect' ? e.message : e));
|
|
const pinger = setInterval(() =>
|
|
{
|
|
if (!this.clients[client_id])
|
|
{
|
|
return;
|
|
}
|
|
if (!this.clients[client_id].alive)
|
|
{
|
|
return socket.terminate();
|
|
}
|
|
this.clients[client_id].alive = false;
|
|
socket.ping(() => {});
|
|
}, this.cfg.ws_keepalive_interval||30000);
|
|
socket.on('message', (msg) =>
|
|
{
|
|
try
|
|
{
|
|
msg = JSON.parse(msg);
|
|
}
|
|
catch (e)
|
|
{
|
|
socket.send(JSON.stringify({ error: 'bad-json' }));
|
|
return;
|
|
}
|
|
if (!msg)
|
|
{
|
|
socket.send(JSON.stringify({ error: 'empty-message' }));
|
|
}
|
|
else
|
|
{
|
|
this.handleMessage(client_id, msg, socket);
|
|
}
|
|
});
|
|
socket.on('close', () =>
|
|
{
|
|
this.unsubscribeClient(client_id);
|
|
clearInterval(pinger);
|
|
delete this.clients[client_id];
|
|
socket.terminate();
|
|
if (reconnect)
|
|
{
|
|
reconnect();
|
|
}
|
|
});
|
|
return client_id;
|
|
}
|
|
|
|
// eslint-disable-next-line no-unused-vars
|
|
async runHandler(req, data, res)
|
|
{
|
|
// v3/kv/txn
|
|
// v3/kv/range
|
|
// v3/kv/put
|
|
// v3/kv/deleterange
|
|
// v3/lease/grant
|
|
// v3/lease/keepalive
|
|
// v3/lease/revoke O_o
|
|
// v3/kv/lease/revoke O_o
|
|
if (this.stopped)
|
|
{
|
|
throw new RequestError(502, 'Server is stopping');
|
|
}
|
|
const requestUrl = new URL(req.url, 'http://'+(req.headers.host || 'localhost'));
|
|
if (requestUrl.pathname.substr(0, 4) == '/v3/')
|
|
{
|
|
const path = requestUrl.pathname.substr(4).replace(/\/+$/, '').replace(/\/+/g, '_');
|
|
if (this.cluster)
|
|
{
|
|
const res = await this.cluster.checkRaftState(path, requestUrl, data);
|
|
if (res)
|
|
{
|
|
return res;
|
|
}
|
|
}
|
|
const cb = this['handle_'+path];
|
|
if (cb)
|
|
{
|
|
if (req.method != 'POST')
|
|
{
|
|
throw new RequestError(405, 'Please use POST method');
|
|
}
|
|
const res = cb.call(this, data);
|
|
if (res instanceof Promise)
|
|
{
|
|
return await res;
|
|
}
|
|
return res;
|
|
}
|
|
}
|
|
else if (requestUrl.pathname == '/dump')
|
|
{
|
|
return this.handle_dump(data);
|
|
}
|
|
throw new RequestError(404, 'Supported APIs: /v3/kv/txn, /v3/kv/range, /v3/kv/put, /v3/kv/deleterange, '+
|
|
'/v3/lease/grant, /v3/lease/revoke, /v3/kv/lease/revoke, /v3/lease/keepalive');
|
|
}
|
|
|
|
async handle_kv_txn(data)
|
|
{
|
|
return await this.etctree.api_txn(data);
|
|
}
|
|
|
|
async handle_kv_range(data)
|
|
{
|
|
const r = await this.etctree.api_txn({ success: [ { request_range: data } ] });
|
|
return { header: r.header, ...r.responses[0].response_range };
|
|
}
|
|
|
|
async handle_kv_put(data)
|
|
{
|
|
const r = await this.etctree.api_txn({ success: [ { request_put: data } ] });
|
|
return { header: r.header, ...r.responses[0].response_put };
|
|
}
|
|
|
|
async handle_kv_deleterange(data)
|
|
{
|
|
const r = await this.etctree.api_txn({ success: [ { request_delete_range: data } ] });
|
|
return { header: r.header, ...r.responses[0].response_delete_range };
|
|
}
|
|
|
|
handle_lease_grant(data)
|
|
{
|
|
return this.etctree.api_grant_lease(data);
|
|
}
|
|
|
|
handle_lease_revoke(data)
|
|
{
|
|
return this.etctree.api_revoke_lease(data);
|
|
}
|
|
|
|
handle_kv_lease_revoke(data)
|
|
{
|
|
return this.etctree.api_revoke_lease(data);
|
|
}
|
|
|
|
handle_lease_keepalive(data)
|
|
{
|
|
return this.etctree.api_keepalive_lease(data);
|
|
}
|
|
|
|
// eslint-disable-next-line no-unused-vars
|
|
handle_dump(data)
|
|
{
|
|
return { ...this.etctree.dump(), term: this.stored_term };
|
|
}
|
|
|
|
handleMessage(client_id, msg, socket)
|
|
{
|
|
const client = this.clients[client_id];
|
|
console.log(new Date().toISOString()+' '+client.addr+' '+(client.raft_node_id || '-')+' -> '+JSON.stringify(msg));
|
|
if (msg.create_request)
|
|
{
|
|
const create_request = msg.create_request;
|
|
if (!create_request.watch_id || !client.watches[create_request.watch_id])
|
|
{
|
|
const watch = this.etctree.api_create_watch(
|
|
{ ...create_request, watch_id: null }, (msg) => socket.send(JSON.stringify(msg))
|
|
);
|
|
if (!watch.created)
|
|
{
|
|
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, ...watch } }));
|
|
}
|
|
else
|
|
{
|
|
create_request.watch_id = create_request.watch_id || watch.watch_id;
|
|
client.watches[create_request.watch_id] = watch.watch_id;
|
|
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, created: true } }));
|
|
}
|
|
}
|
|
}
|
|
else if (msg.cancel_request)
|
|
{
|
|
const mapped_id = client.watches[msg.cancel_request.watch_id];
|
|
if (mapped_id)
|
|
{
|
|
this.etctree.api_cancel_watch({ watch_id: mapped_id });
|
|
delete client.watches[msg.cancel_request.watch_id];
|
|
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: msg.cancel_request.watch_id, canceled: true } }));
|
|
}
|
|
}
|
|
else if (msg.progress_request)
|
|
{
|
|
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision } } }));
|
|
}
|
|
else
|
|
{
|
|
if (!this.cluster)
|
|
{
|
|
return;
|
|
}
|
|
this.cluster.handleWsMsg(client, msg);
|
|
}
|
|
}
|
|
|
|
unsubscribeClient(client_id)
|
|
{
|
|
if (!this.clients[client_id])
|
|
{
|
|
return;
|
|
}
|
|
for (const watch_id in this.clients[client_id].watches)
|
|
{
|
|
const mapped_id = this.clients[client_id].watches[watch_id];
|
|
this.etctree.api_cancel_watch({ watch_id: mapped_id });
|
|
}
|
|
}
|
|
}
|
|
|
|
function vitastor_persist_filter(prefix)
|
|
{
|
|
return (key, value) =>
|
|
{
|
|
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')
|
|
{
|
|
if (value)
|
|
{
|
|
try
|
|
{
|
|
value = JSON.parse(value);
|
|
value = JSON.stringify({
|
|
bitmap_granularity: value.bitmap_granularity || undefined,
|
|
data_block_size: value.data_block_size || undefined,
|
|
host: value.host || undefined,
|
|
immediate_commit: value.immediate_commit || undefined,
|
|
});
|
|
}
|
|
catch (e)
|
|
{
|
|
console.error('invalid JSON in '+key+' = '+value+': '+e);
|
|
value = {};
|
|
}
|
|
}
|
|
else
|
|
{
|
|
value = undefined;
|
|
}
|
|
return value;
|
|
}
|
|
else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' ||
|
|
key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' ||
|
|
key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' ||
|
|
key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' ||
|
|
key == prefix+'/stats')
|
|
{
|
|
return undefined;
|
|
}
|
|
return value;
|
|
};
|
|
}
|
|
|
|
const help_text = `Miniature etcd replacement based on TinyRaft
|
|
(c) Vitaliy Filippov, 2024
|
|
License: Mozilla Public License 2.0
|
|
|
|
Usage:
|
|
|
|
${process.argv[0]} ${process.argv[1]}
|
|
[--cert ssl.crt] [--key ssl.key] [--port 12379]
|
|
[--data data.gz] [--vitastor-persist-filter /vitastor] [--no-persist-filter] [--persist_interval 500]
|
|
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381]
|
|
[other options]
|
|
|
|
Supported etcd REST APIs:
|
|
|
|
/v3/kv/txn /v3/kv/put /v3/kv/range /v3/kv/deleterange
|
|
/v3/lease/grant /v3/lease/keepalive /v3/lease/revoke /v3/kv/lease/revoke
|
|
websocket-based watch API (create_request, cancel_request, progress_request)
|
|
|
|
Options:
|
|
|
|
HTTP:
|
|
|
|
--port 2379
|
|
Listen port
|
|
--cert <filename>
|
|
Use TLS with this certificate file (PEM format)
|
|
--key <filename>
|
|
Use TLS with this key file (PEM format)
|
|
--ws_keepalive_interval 30000
|
|
Client websocket ping (keepalive) interval in milliseconds
|
|
|
|
Persistence:
|
|
|
|
--data <filename>
|
|
Use <filename> to store persistent data
|
|
--persist_interval <milliseconds>
|
|
Persist data on disk after this interval, not immediately
|
|
--no_persist_filter
|
|
Store all data
|
|
--vitastor_persist_filter <prefix>
|
|
Store only data required for Vitastor with prefix <prefix> on disk
|
|
|
|
Clustering:
|
|
|
|
--node_id <id>
|
|
ID of this cluster node
|
|
--cluster <id1>=<url1>,<id2>=<url2>,...
|
|
All other cluster nodes
|
|
--cluster_key <key>
|
|
Shared cluster key for identification
|
|
--election_timeout 5000
|
|
Raft election timeout
|
|
--heartbeat_timeout 1000
|
|
Raft leader heartbeat timeout
|
|
--leader_priority <number>
|
|
Raft leader priority for this node (optional)
|
|
--stale_read 0|1
|
|
Allow to serve reads from followers
|
|
--reconnect_interval 1000
|
|
Unavailable peer connection retry interval
|
|
--dump_timeout 5000
|
|
Timeout for dump command in milliseconds
|
|
--load_timeout 5000
|
|
Timeout for load command in milliseconds
|
|
--forward_timeout 1000
|
|
Timeout for forwarding requests from follower to leader in milliseconds
|
|
--replication_timeout 1000
|
|
Timeout for replicating requests from leader to follower in milliseconds
|
|
--compact_revisions 1000
|
|
Number of previous revisions to keep deletion information in memory
|
|
--compact_timeout 1000
|
|
Timeout for compaction requests from leader to follower in milliseconds
|
|
`;
|
|
|
|
function parse()
|
|
{
|
|
const options = {
|
|
persist_filter: vitastor_persist_filter('/vitastor'),
|
|
};
|
|
for (let i = 2; i < process.argv.length; i++)
|
|
{
|
|
const arg = process.argv[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
|
|
if (arg === '-h' || arg === '--help')
|
|
{
|
|
console.error(help_text.trim());
|
|
process.exit();
|
|
}
|
|
else if (arg == '--no_persist_filter')
|
|
{
|
|
options['persist_filter'] = null;
|
|
}
|
|
else if (arg == '--vitastor_persist_filter')
|
|
{
|
|
options['persist_filter'] = vitastor_persist_filter(process.argv[++i]||'');
|
|
}
|
|
else if (arg.substr(0, 2) == '--' && arg != '--persist_filter')
|
|
{
|
|
options[arg.substr(2)] = process.argv[++i];
|
|
}
|
|
}
|
|
return options;
|
|
}
|
|
|
|
new AntiEtcd(parse()).run().catch(console.error);
|