135 lines
3.8 KiB
JavaScript
135 lines
3.8 KiB
JavaScript
const fs = require('fs');
|
|
const fsp = require('fs').promises;
|
|
const zlib = require('zlib');
|
|
|
|
const stableStringify = require('./stable-stringify.js');
|
|
const EtcTree = require('./etctree.js');
|
|
const { de64, runCallbacks } = require('./common.js');
|
|
|
|
class AntiPersistence
|
|
{
|
|
constructor(antietcd)
|
|
{
|
|
this.cfg = antietcd.cfg;
|
|
this.antietcd = antietcd;
|
|
this.prev_value = {};
|
|
this.persist_timer = null;
|
|
this.wait_persist = null;
|
|
}
|
|
|
|
async load()
|
|
{
|
|
// eslint-disable-next-line no-unused-vars
|
|
const [ err, stat ] = await new Promise(ok => fs.stat(this.cfg.data, (err, stat) => ok([ err, stat ])));
|
|
if (!err)
|
|
{
|
|
let data = await fsp.readFile(this.cfg.data);
|
|
data = await new Promise((ok, no) => zlib.gunzip(data, (err, res) => err ? no(err) : ok(res)));
|
|
data = JSON.parse(data);
|
|
this.loading = true;
|
|
this.antietcd.etctree.load(data);
|
|
this.loading = false;
|
|
this.antietcd.stored_term = data['term'] || 0;
|
|
}
|
|
else if (err.code != 'ENOENT')
|
|
{
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
async persistChange(msg)
|
|
{
|
|
if (this.loading)
|
|
{
|
|
return;
|
|
}
|
|
if (!msg.events || !msg.events.length)
|
|
{
|
|
// lease-only changes don't need to be persisted
|
|
return;
|
|
}
|
|
if (this.cfg.persist_filter)
|
|
{
|
|
let changed = false;
|
|
for (const ev of msg.events)
|
|
{
|
|
if (ev.lease)
|
|
{
|
|
// Values with lease are never persisted
|
|
const key = de64(ev.key);
|
|
if (this.prev_value[key] !== undefined)
|
|
{
|
|
delete this.prev_value[key];
|
|
changed = true;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
const key = de64(ev.key);
|
|
const filtered = this.cfg.persist_filter(key, ev.value == null ? undefined : de64(ev.value));
|
|
if (!EtcTree.eq(filtered, this.prev_value[key]))
|
|
{
|
|
this.prev_value[key] = filtered;
|
|
changed = true;
|
|
}
|
|
}
|
|
}
|
|
if (!changed)
|
|
{
|
|
return;
|
|
}
|
|
}
|
|
await this.schedulePersist();
|
|
}
|
|
|
|
async schedulePersist()
|
|
{
|
|
if (!this.cfg.persist_interval)
|
|
{
|
|
await this.persist();
|
|
return;
|
|
}
|
|
if (!this.persist_timer)
|
|
{
|
|
this.persist_timer = setTimeout(() =>
|
|
{
|
|
this.persist_timer = null;
|
|
this.persist().catch(console.error);
|
|
}, this.cfg.persist_interval);
|
|
}
|
|
}
|
|
|
|
async persist()
|
|
{
|
|
if (!this.cfg.data)
|
|
{
|
|
return;
|
|
}
|
|
while (this.wait_persist)
|
|
{
|
|
await new Promise(ok => this.wait_persist.push(ok));
|
|
}
|
|
this.wait_persist = [];
|
|
try
|
|
{
|
|
let dump = this.antietcd.etctree.dump(true);
|
|
dump['term'] = this.antietcd.stored_term;
|
|
dump = stableStringify(dump);
|
|
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));
|
|
const fh = await fsp.open(this.cfg.data+'.tmp', 'w');
|
|
await fh.writeFile(dump);
|
|
await fh.sync();
|
|
await fh.close();
|
|
await fsp.rename(this.cfg.data+'.tmp', this.cfg.data);
|
|
}
|
|
catch (e)
|
|
{
|
|
console.error('Error persisting data to disk: '+e);
|
|
process.exit(1);
|
|
}
|
|
runCallbacks(this, 'wait_persist', null);
|
|
}
|
|
}
|
|
|
|
module.exports = AntiPersistence;
|