tinyraft/antipersistence.js

135 lines
3.8 KiB
JavaScript

const fs = require('fs');
const fsp = require('fs').promises;
const zlib = require('zlib');
const stableStringify = require('./stable-stringify.js');
const EtcTree = require('./etctree.js');
const { de64, runCallbacks } = require('./common.js');
class AntiPersistence
{
constructor(antietcd)
{
this.cfg = antietcd.cfg;
this.antietcd = antietcd;
this.prev_value = {};
this.persist_timer = null;
this.wait_persist = null;
}
async load()
{
// eslint-disable-next-line no-unused-vars
const [ err, stat ] = await new Promise(ok => fs.stat(this.cfg.data, (err, stat) => ok([ err, stat ])));
if (!err)
{
let data = await fsp.readFile(this.cfg.data);
data = await new Promise((ok, no) => zlib.gunzip(data, (err, res) => err ? no(err) : ok(res)));
data = JSON.parse(data);
this.loading = true;
this.antietcd.etctree.load(data);
this.loading = false;
this.antietcd.stored_term = data['term'] || 0;
}
else if (err.code != 'ENOENT')
{
throw err;
}
}
async persistChange(msg)
{
if (this.loading)
{
return;
}
if (!msg.events || !msg.events.length)
{
// lease-only changes don't need to be persisted
return;
}
if (this.cfg.persist_filter)
{
let changed = false;
for (const ev of msg.events)
{
if (ev.lease)
{
// Values with lease are never persisted
const key = de64(ev.key);
if (this.prev_value[key] !== undefined)
{
delete this.prev_value[key];
changed = true;
}
}
else
{
const key = de64(ev.key);
const filtered = this.cfg.persist_filter(key, ev.value == null ? undefined : de64(ev.value));
if (!EtcTree.eq(filtered, this.prev_value[key]))
{
this.prev_value[key] = filtered;
changed = true;
}
}
}
if (!changed)
{
return;
}
}
await this.schedulePersist();
}
async schedulePersist()
{
if (!this.cfg.persist_interval)
{
await this.persist();
return;
}
if (!this.persist_timer)
{
this.persist_timer = setTimeout(() =>
{
this.persist_timer = null;
this.persist().catch(console.error);
}, this.cfg.persist_interval);
}
}
async persist()
{
if (!this.cfg.data)
{
return;
}
while (this.wait_persist)
{
await new Promise(ok => this.wait_persist.push(ok));
}
this.wait_persist = [];
try
{
let dump = this.antietcd.etctree.dump(true);
dump['term'] = this.antietcd.stored_term;
dump = stableStringify(dump);
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));
const fh = await fsp.open(this.cfg.data+'.tmp', 'w');
await fh.writeFile(dump);
await fh.sync();
await fh.close();
await fsp.rename(this.cfg.data+'.tmp', this.cfg.data);
}
catch (e)
{
console.error('Error persisting data to disk: '+e);
process.exit(1);
}
runCallbacks(this, 'wait_persist', null);
}
}
module.exports = AntiPersistence;