diff --git a/antietcd.js b/antietcd.js index 2198d5b..e61157a 100644 --- a/antietcd.js +++ b/antietcd.js @@ -1,7 +1,10 @@ const fs = require('fs'); +const fsp = require('fs').promises; const { URL } = require('url'); const http = require('http'); const https = require('https'); +const crypto = require('crypto'); +const zlib = require('zlib'); const ws = require('ws'); @@ -19,27 +22,165 @@ class RequestError class AntiEtcd { constructor(cfg) - { - this.cfg = cfg; - } - - run() { this.clients = {}; this.client_id = 1; this.etctree = new EtcTree(true); + this.cfg = cfg; + this.loading = false; + this.stopped = false; + this.inflight = 0; + this.waitInflight = []; + this.prevValue = {}; + } + + async run() + { + if (this.cfg.filename) + { + // Set persistence event listener + this.persistence_watch = this.etctree.api_create_watch({ key: b64('/'), range_end: b64('0') }, msg => this.persistChange(msg)); + // Load data from disk + const [ err, stat ] = await new Promise(ok => fs.stat(this.cfg.filename, (err, stat) => ok([ err, stat ]))); + if (!err) + { + let data = await fsp.readFile(this.cfg.filename); + data = await new Promise((ok, no) => zlib.gunzip(data, (err, res) => err ? no(err) : ok(res))); + data = JSON.parse(data); + this.loading = true; + this.etctree.load(data); + this.loading = false; + } + else if (err.code != 'ENOENT') + { + throw err; + } + // Set exit hook + const on_stop_cb = () => this.on_stop(); + process.on('SIGINT', on_stop_cb); + process.on('SIGTERM', on_stop_cb); + process.on('SIGQUIT', on_stop_cb); + } if (this.cfg.cert) { - this.tls = { key: fs.readFileSync(this.cfg.key), cert: fs.readFileSync(this.cfg.cert) }; + this.tls = { key: await fsp.readFile(this.cfg.key), cert: await fsp.readFile(this.cfg.cert) }; this.server = https.createServer(this.tls, (req, res) => this.handleRequest(req, res)); } else + { this.server = http.createServer((req, res) => this.handleRequest(req, res)); + } this.wss = new ws.WebSocketServer({ server: this.server }); this.wss.on('connection', (conn, req) => this.startWebsocket(conn, req)); this.server.listen(this.cfg.port || 2379); } + async on_stop() + { + if (this.stopped) + { + return; + } + this.stopped = true; + // Wait until all requests complete + while (this.inflight > 0) + { + await new Promise(ok => this.waitInflight.push(ok)); + } + await this.persist(); + process.exit(0); + } + + persistChange(msg) + { + // FIXME: Persist and replicate change before sending the event to listeners if persist_interval is 0 + if (!this.cfg.persist_filter) + { + this.schedulePersist(); + return; + } + let changed = false; + for (const ev of msg.result.events) + { + if (ev.kv.lease) + { + // Values with lease are never persisted + const key = de64(ev.kv.key); + if (this.prevValue[key] !== undefined) + { + delete this.prevValue[key]; + changed = true; + } + } + else + { + const key = de64(ev.kv.key); + const filtered = this.cfg.persist_filter(key, ev.type === 'DELETE' ? undefined : de64(ev.kv.value)); + if (!EtcTree.eq(filtered, this.prevValue[key])) + { + this.prevValue[key] = filtered; + changed = true; + } + } + changed = true; + } + if (changed && !this.loading) + { + this.schedulePersist(); + } + } + + schedulePersist() + { + if (!this.persistTimer) + { + this.persistTimer = setTimeout(() => + { + this.persistTimer = null; + this.persist().catch(console.error); + }, this.cfg.persist_interval||500); + } + } + + async persist() + { + if (!this.cfg.filename) + { + return; + } + if (this.persisting) + { + await new Promise(ok => this.persisting.push(ok)); + } + let err; + this.persisting = []; + try + { + let dump = this.etctree.dump(true); + dump = JSON.stringify(dump); + dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res))); + const fh = await fsp.open(this.cfg.filename+'.tmp', 'w'); + await fh.writeFile(dump); + await fh.sync(); + await fh.close(); + await fsp.rename(this.cfg.filename+'.tmp', this.cfg.filename); + } + catch (e) + { + err = e; + } + const cbs = this.persisting; + this.persisting = null; + for (const cb of cbs) + { + cb(); + } + if (err) + { + throw err; + } + } + fail(res, code, text) { res.writeHead(code); @@ -51,8 +192,9 @@ class AntiEtcd { let data = []; req.on('data', (chunk) => data.push(chunk)); - req.on('end', () => + req.on('end', async () => { + this.inflight++; data = Buffer.concat(data); let body = ''; let code = 200; @@ -76,7 +218,7 @@ class AntiEtcd { throw new RequestError(400, 'body should be JSON object'); } - reply = this.runHandler(req, data, res); + reply = await this.runHandler(req, data, res); reply = JSON.stringify(reply); } catch (e) @@ -93,21 +235,38 @@ class AntiEtcd reply = 'Internal error: '+e.message; } } - // Access log - console.log( - new Date().toISOString()+ - ' '+(req.headers['x-forwarded-for'] || req.socket.remoteAddress)+ - ' '+req.method+' '+req.url+' '+code+'\n '+body.replace(/\n/g, '\\n')+ - '\n '+reply.replace(/\n/g, '\\n') - ); - // FIXME: Access log :req[X-Forwarded-For] [:date[clf]] pid=:pid ":method :url HTTP/:http-version" :status :res[content-length] ":referrer" ":user-agent" - :response-time ms - reply = Buffer.from(reply); - res.writeHead(200, { - 'Content-Type': 'application/json', - 'Content-Length': reply.length, - }); - res.write(reply); - res.end(); + try + { + // Access log + console.log( + new Date().toISOString()+ + ' '+(req.headers['x-forwarded-for'] || req.socket.remoteAddress)+ + ' '+req.method+' '+req.url+' '+code+'\n '+body.replace(/\n/g, '\\n')+ + '\n '+reply.replace(/\n/g, '\\n') + ); + // FIXME: Access log :req[X-Forwarded-For] [:date[clf]] pid=:pid ":method :url HTTP/:http-version" :status :res[content-length] ":referrer" ":user-agent" - :response-time ms + reply = Buffer.from(reply); + res.writeHead(200, { + 'Content-Type': 'application/json', + 'Content-Length': reply.length, + }); + res.write(reply); + res.end(); + } + catch (e) + { + console.error(e); + } + this.inflight--; + if (!this.inflight && this.waitInflight.length) + { + const cbs = this.waitInflight; + this.waitInflight = []; + for (const cb of cbs) + { + cb(); + } + } }); } @@ -163,7 +322,7 @@ class AntiEtcd }); } - runHandler(req, data, res) + async runHandler(req, data, res) { // v3/kv/txn // v3/kv/range @@ -173,6 +332,10 @@ class AntiEtcd // v3/lease/keepalive // v3/lease/revoke O_o // v3/kv/lease/revoke O_o + if (this.stopped) + { + throw new RequestError(502, 'Server is stopping'); + } const requestUrl = new URL(req.url, 'http://'+(req.headers.host || 'localhost')); if (requestUrl.pathname.substr(0, 4) == '/v3/') { @@ -187,6 +350,10 @@ class AntiEtcd return cb.call(this, data); } } + else if (requestUrl.pathname == '/dump') + { + return this.handle_dump(data); + } throw new RequestError(404, 'Supported APIs: /v3/kv/txn, /v3/kv/range, /v3/kv/put, /v3/kv/deleterange, '+ '/v3/lease/grant, /v3/lease/revoke, /v3/kv/lease/revoke, /v3/lease/keepalive'); } @@ -234,6 +401,11 @@ class AntiEtcd return this.etctree.api_keepalive_lease(data); } + handle_dump(data) + { + return this.etctree.dump(); + } + handleMessage(client_id, msg, socket) { if (msg.create_request) @@ -285,4 +457,60 @@ class AntiEtcd } } -new AntiEtcd({ port: 12379 }).run(); +function vitastor_persist_filter(prefix) +{ + return (key, value) => + { + if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/') + { + if (value) + { + try + { + value = JSON.parse(value); + value = JSON.stringify({ + bitmap_granularity: value.bitmap_granularity || undefined, + data_block_size: value.data_block_size || undefined, + host: value.host || undefined, + immediate_commit: value.immediate_commit || undefined, + }); + } + catch (e) + { + console.error('invalid JSON in '+key+' = '+value+': '+e); + value = {}; + } + } + else + { + value = undefined; + } + return value; + } + else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' || + key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' || + key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' || + key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' || + key == prefix+'/stats') + { + return undefined; + } + return value; + }; +} + +function de64(k) +{ + if (k == null) // null or undefined + return k; + return Buffer.from(k, 'base64').toString(); +} + +function b64(k) +{ + if (k == null) // null or undefined + return k; + return Buffer.from(k).toString('base64'); +} + +new AntiEtcd({ port: 12379, persist_filter: vitastor_persist_filter('/vitastor'), filename: __dirname+'/data.gz' }).run().catch(console.error); diff --git a/etctree.js b/etctree.js index f6a98e8..64b3f5b 100644 --- a/etctree.js +++ b/etctree.js @@ -135,10 +135,10 @@ class EtcTree } // create a snapshot of all data including leases - dump(persistent_only) + dump(persistent_only, value_filter) { const snapshot = { - state: this._copy_tree(this.state, persistent_only) || {}, + state: this._copy_tree(this.state, persistent_only, value_filter) || {}, mod_revision: this.mod_revision, }; if (!persistent_only) @@ -153,17 +153,27 @@ class EtcTree return snapshot; } - _copy_tree(cur, no_lease) + _copy_tree(cur, no_lease, value_filter) { - const nonempty = cur.value != null && (!no_lease || !copy.lease); + let nonempty = cur.value != null && (!no_lease || !cur.lease); + let filtered; + if (nonempty && value_filter) + { + filtered = value_filter(cur.value); + nonempty = nonempty && filtered != null; + } const copy = (nonempty ? { ...cur } : {}); copy.children = {}; + if (nonempty && value_filter) + { + copy.value = filtered; + } delete copy.watchers; delete copy.key_watchers; let has_children = false; for (const k in cur.children) { - const child = this._copy_tree(cur.children[k]); + const child = this._copy_tree(cur.children[k], no_lease, value_filter); if (child) { copy.children[k] = child; @@ -437,13 +447,19 @@ class EtcTree { if (cur.mod_revision >= min_rev) { - events.push({ + const ev = { type: cur.value == null ? 'DELETE' : 'PUT', kv: cur.value == null ? { key: this.b64(prefix === null ? '' : prefix) } : { key: this.b64(prefix), value: this.b64(cur.value), + mod_revision: cur.mod_revision, }, - }); + }; + if (cur.lease) + { + ev.kv.lease = cur.lease; + } + events.push(ev); } if (cur.children) {