tinyraft/etctree.js

866 lines
26 KiB
JavaScript

const crypto = require('crypto');
const stableStringify = require('./stable-stringify.js');
/*type TreeNode = {
value?: any,
create_revision?: number,
mod_revision?: number,
version?: number,
lease?: string,
children: { [string]: TreeNode },
watchers?: number[],
key_watchers?: number[],
};*/
class EtcTree
{
constructor(use_base64)
{
this.state = {};
this.leases = {};
this.watchers = {};
this.watcher_id = 0;
this.mod_revision = 0;
this.compact_revision = 0;
this.use_base64 = use_base64;
this.replicate = null;
this.paused = false;
this.active_immediate = [];
}
destroy()
{
this.pause_leases();
for (const imm of this.active_immediate)
{
clearImmediate(imm);
}
}
set_replicate_watcher(replicate)
{
// Replication watcher is special:
// It should be an async function and it is called BEFORE notifying all
// other watchers about any change.
// It may also throw to prevent notifying at all if replication fails.
this.replicate = replicate;
}
de64(k)
{
if (k == null) // null or undefined
return k;
return this.use_base64 ? Buffer.from(k, 'base64').toString() : k;
}
b64(k)
{
if (k == null) // null or undefined
return k;
return this.use_base64 ? Buffer.from(k).toString('base64') : k;
}
_check(chk)
{
const parts = this._key_parts(this.de64(chk.key));
const { cur } = this._get_subtree(parts, false, false);
let check_value, ref_value;
if (chk.target === 'MOD')
{
check_value = cur && cur.mod_revision || 0;
ref_value = chk.mod_revision || 0;
}
else if (chk.target === 'CREATE')
{
check_value = cur && cur.create_revision || 0;
ref_value = chk.create_revision || 0;
}
else if (chk.target === 'VERSION')
{
check_value = cur && cur.version || 0;
ref_value = chk.version || 0;
}
else if (chk.target === 'LEASE')
{
check_value = cur && cur.lease;
ref_value = chk.lease;
}
else
{
throw new Error('Unsupported comparison target: '+chk.target);
}
if (chk.result === 'LESS')
{
return check_value < ref_value;
}
else if (chk.result)
{
throw new Error('Unsupported comparison result: '+chk.result);
}
return check_value == ref_value;
}
_key_parts(key)
{
const parts = key.replace(/\/\/+/g, '/').replace(/\/$/g, ''); // trim beginning?
return parts === '' ? [] : parts.split('/');
}
_get_range(req)
{
const key = this.de64(req.key);
const end = this.de64(req.range_end);
if (end != null && (key[key.length-1] != '/' || end[end.length-1] != '0' ||
end.substr(0, end.length-1) !== key.substr(0, key.length-1)))
{
throw new Error('Non-directory range queries are unsupported');
}
const parts = this._key_parts(key);
return { parts, all: end != null };
}
_get_subtree(parts, create, notify)
{
let cur = this.state;
let watchers = notify ? [] : null;
for (let k of parts)
{
if (notify && cur.watchers)
{
watchers.push.apply(watchers, cur.watchers);
}
if (!cur.children)
{
if (!create)
{
return {};
}
cur.children = {};
}
if (!cur.children[k])
{
if (!create)
{
return {};
}
cur.children[k] = {};
}
cur = cur.children[k];
}
if (notify && cur.watchers)
{
watchers.push.apply(watchers, cur.watchers);
}
return { watchers, cur };
}
// create a snapshot of all data including leases
dump(persistent_only, value_filter)
{
const snapshot = {
state: this._copy_tree(this.state, persistent_only, value_filter) || {},
mod_revision: this.mod_revision,
compact_revision: this.compact_revision,
};
if (!persistent_only)
{
snapshot.leases = {};
for (const id in this.leases)
{
const lease = this.leases[id];
snapshot.leases[id] = { ttl: lease.ttl, expires: lease.expires };
}
}
return snapshot;
}
_copy_tree(cur, no_lease, value_filter)
{
let nonempty = cur.value != null && (!no_lease || !cur.lease);
let filtered;
if (nonempty && value_filter)
{
filtered = value_filter(cur.value);
nonempty = nonempty && filtered != null;
}
const copy = (nonempty ? { ...cur } : {});
copy.children = {};
if (nonempty && value_filter)
{
copy.value = filtered;
}
delete copy.watchers;
delete copy.key_watchers;
let has_children = false;
for (const k in cur.children)
{
const child = this._copy_tree(cur.children[k], no_lease, value_filter);
if (child)
{
copy.children[k] = child;
has_children = true;
}
}
if (!nonempty && !has_children)
{
return null;
}
if (!has_children)
{
delete copy.children;
}
return copy;
}
// load snapshot of all data including leases
load(snapshot, update_only)
{
if (!update_only || this.mod_revision < snapshot.mod_revision)
{
this.mod_revision = snapshot.mod_revision;
}
if (!update_only || this.compact_revision > (snapshot.compact_revision||0))
{
this.compact_revision = snapshot.compact_revision||0;
}
// First apply leases
const notifications = [];
if (!update_only && snapshot.leases)
{
for (const id in this.leases)
{
if (!snapshot.leases[id])
{
// Revoke without replicating and notifying
this._sync_revoke_lease(id, notifications, this.mod_revision);
}
}
}
for (const id in snapshot.leases||{})
{
this.load_lease({ id, ...snapshot.leases[id] });
}
// Then find and apply the difference in data
this._restore_diff(update_only, this.state, snapshot.state, null, this.state.watchers || [], notifications);
this._notify(notifications);
}
_restore_diff(update_only, cur_old, cur_new, prefix, watchers, notifications)
{
if (!update_only || !cur_old.mod_revision || cur_old.mod_revision < cur_new.mod_revision)
{
const key = prefix === null ? '' : prefix;
if (!eq(cur_old.lease, cur_new.lease))
{
if (cur_old.lease && this.leases[cur_old.lease])
{
delete this.leases[cur_old.lease].keys[key];
}
cur_old.lease = cur_new.lease;
if (cur_new.lease && this.leases[cur_new.lease])
{
this.leases[cur_new.lease].keys[key] = true;
}
}
cur_old.mod_revision = cur_new.mod_revision;
cur_old.create_revision = cur_new.create_revision;
cur_old.version = cur_new.version;
if (!eq(cur_old.value, cur_new.value))
{
cur_old.value = cur_new.value;
const key_watchers = (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers);
const notify = { watchers: key_watchers, key, value: cur_new.value, mod_revision: cur_new.mod_revision };
if (cur_new.lease)
{
notify.lease = cur_new.lease;
}
notifications.push(notify);
}
}
cur_old.children = cur_old.children || {};
for (const k in cur_new.children)
{
if (!cur_old.children[k])
{
cur_old.children[k] = cur_new.children[k];
}
else
{
this._restore_diff(
update_only, cur_old.children[k], cur_new.children[k],
prefix === null ? k : prefix+'/'+k,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
notifications
);
}
}
if (!update_only)
{
for (const k in cur_old.children)
{
if (!cur_new.children || !cur_new.children[k])
{
// Delete subtree
this._delete_all(
notifications,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
cur_old.children[k], true,
prefix === null ? k : prefix+'/'+k,
this.mod_revision
);
}
}
}
}
// slave/follower nodes don't expire leases themselves, they listen for the leader instead
pause_leases()
{
if (this.paused)
{
return;
}
this.paused = true;
for (const id in this.leases)
{
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
}
}
resume_leases()
{
if (!this.paused)
{
return;
}
this.paused = false;
for (const id in this.leases)
{
this._set_expire(id);
}
}
_set_expire(id)
{
if (!this.paused)
{
const lease = this.leases[id];
if (!lease.timer_id)
{
lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }).catch(console.error), lease.expires - Date.now());
}
}
}
async api_grant_lease(req)
{
let id;
while (!id || this.leases[id])
{
id = crypto.randomBytes(8).toString('hex');
}
const expires = Date.now() + req.TTL*1000;
this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} };
this.mod_revision++;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl: req.TTL, expires } ] });
}
return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL };
}
async api_keepalive_lease(req)
{
const id = req.ID;
if (!this.leases[id])
{
throw new Error('unknown lease');
}
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
const ttl = this.leases[id].ttl;
lease.expires = Date.now() + ttl*1000;
this.mod_revision++;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl, expires: lease.expires } ] });
}
// extra wrapping in { result: ... }
return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } };
}
load_lease(lease)
{
const id = lease.id;
if (!this.leases[id])
{
this.leases[id] = { ...lease, timer_id: null, keys: {} };
}
else if (this.leases[id].ttl != lease.ttl ||
this.leases[id].expires != lease.expires)
{
this.leases[id].ttl = lease.ttl;
this.leases[id].expires = lease.expires;
}
else
{
return false;
}
if (this.leases[id].timer_id)
{
clearTimeout(this.leases[id].timer_id);
this.leases[id].timer_id = null;
}
this._set_expire(id);
return true;
}
_sync_revoke_lease(id, notifications, next_revision)
{
if (!this.leases[id])
{
throw new Error('unknown lease');
}
for (const key in this.leases[id].keys)
{
this._delete_range({ key }, next_revision, notifications);
}
delete this.leases[id];
}
async api_revoke_lease(req, no_throw)
{
const notifications = [];
if (!this.leases[req.ID])
{
if (no_throw)
return null;
throw new Error('unknown lease');
}
this.mod_revision++;
this._sync_revoke_lease(req.ID, notifications, this.mod_revision);
if (this.replicate)
{
await this.notify_replicator(notifications, [ { id: req.ID } ]);
}
this._notify(notifications);
return { header: { revision: this.mod_revision } };
}
async notify_replicator(notifications, leases)
{
// First replicate the change and then notify watchers about it
const all_changes = {};
for (const chg of notifications)
{
all_changes[chg.key] = { ...chg };
delete all_changes[chg.key].watchers;
}
await this.replicate({ header: { revision: this.mod_revision }, events: Object.values(all_changes), leases });
}
async apply_replication(msg)
{
this.mod_revision = msg.header.revision;
const notifications = [];
if ((msg.leases||[]).length)
{
for (const lease of msg.leases)
{
if (lease.ttl)
{
this.load_lease(lease);
}
else
{
this._sync_revoke_lease(lease.id, notifications, this.mod_revision);
}
}
}
if ((msg.events||[]).length)
{
for (const ev of msg.events)
{
if (ev.value == null)
{
this._delete_range({ key: ev.key }, ev.mod_revision, notifications);
}
else
{
this._put({ key: ev.key, value: ev.value, lease: ev.lease }, ev.mod_revision, notifications);
}
}
}
if (this.replicate)
{
await this.notify_replicator(notifications, msg.leases);
}
this._notify(notifications);
}
// forget deletions before compact_revision
compact(compact_revision)
{
this._compact(compact_revision, this.state);
this.compact_revision = compact_revision;
}
_compact(compact_revision, cur)
{
for (const key in cur.children||{})
{
const child = cur.children[key];
this._compact(compact_revision, child);
if (emptyObj(child.children) && child.value == null && child.mod_revision < compact_revision)
{
delete cur.children[key];
}
}
}
api_create_watch(req, send)
{
const { parts, all } = this._get_range(req);
if (req.start_revision && this.compact_revision && this.compact_revision > req.start_revision)
{
// Deletions up to this.compact_revision are forgotten
return { compact_revision: this.compact_revision };
}
let watch_id = req.watch_id;
if (watch_id instanceof Object)
{
throw new Error('invalid watch_id');
}
if (!watch_id)
{
watch_id = ++this.watcher_id;
}
if (!this.watchers[watch_id])
{
this.watchers[watch_id] = {
paths: [],
send,
};
}
this.watchers[watch_id].paths.push(parts);
const { cur } = this._get_subtree(parts, true, false);
if (all)
{
cur.watchers = cur.watchers || [];
cur.watchers.push(watch_id);
}
else
{
cur.key_watchers = cur.key_watchers || [];
cur.key_watchers.push(watch_id);
}
if (req.start_revision && req.start_revision < this.mod_revision)
{
// Send initial changes
const imm = setImmediate(() =>
{
this.active_immediate = this.active_immediate.filter(i => i !== imm);
const events = [];
const { cur } = this._get_subtree([], false, false);
this._get_modified(events, cur, null, req.start_revision);
send({ result: { header: { revision: this.mod_revision }, events } });
});
this.active_immediate.push(imm);
}
return { watch_id, created: true };
}
_get_modified(events, cur, prefix, min_rev)
{
if (cur.mod_revision >= min_rev)
{
const ev = {
type: cur.value == null ? 'DELETE' : 'PUT',
kv: cur.value == null ? { key: this.b64(prefix === null ? '' : prefix) } : {
key: this.b64(prefix),
value: this.b64(cur.value),
mod_revision: cur.mod_revision,
},
};
if (cur.lease)
{
ev.kv.lease = cur.lease;
}
events.push(ev);
}
if (cur.children)
{
for (const k in cur.children)
{
this._get_modified(events, cur.children[k], prefix === null ? k : prefix+'/'+k, min_rev);
}
}
}
api_cancel_watch(watch_id)
{
if (this.watchers[watch_id])
{
for (const parts of this.watchers[watch_id].paths)
{
const { cur } = this._get_subtree(parts, false, false);
if (cur)
{
if (cur.watchers)
{
cur.watchers = cur.watchers.filter(id => id != watch_id);
if (!cur.watchers.length)
cur.watchers = null;
}
if (cur.key_watchers)
{
cur.key_watchers = cur.key_watchers.filter(id => id != watch_id);
if (!cur.key_watchers.length)
cur.key_watchers = null;
}
}
}
delete this.watchers[watch_id];
}
return { canceled: true };
}
_notify(notifications)
{
if (!notifications.length)
{
return;
}
const by_watcher = {};
for (const notif of notifications)
{
const watchers = notif.watchers;
delete notif.watchers;
const conv = { type: ('value' in notif) ? 'PUT' : 'DELETE', kv: notif };
for (const wid of watchers)
{
if (this.watchers[wid])
{
by_watcher[wid] = by_watcher[wid] || { header: { revision: this.mod_revision }, events: {} };
by_watcher[wid].events[notif.key] = conv;
}
}
}
for (const wid in by_watcher)
{
by_watcher[wid].events = Object.values(by_watcher[wid].events);
this.watchers[wid].send({ result: by_watcher[wid] });
}
}
async api_txn({ compare, success, failure })
{
const failed = (compare || []).filter(chk => !this._check(chk)).length > 0;
const responses = [];
const notifications = [];
const next_revision = this.mod_revision + 1;
for (const req of (failed ? failure : success) || [])
{
responses.push(this._txn_action(req, next_revision, notifications));
}
if (this.replicate && notifications.length)
{
// First replicate the change and then notify watchers about it
await this.notify_replicator(notifications);
}
this._notify(notifications);
return { header: { revision: this.mod_revision }, succeeded: !failed, responses };
}
_txn_action(req, cur_revision, notifications)
{
if (req.request_range || req.requestRange)
{
return { response_range: this._range(req.request_range || req.requestRange) };
}
else if (req.request_put || req.requestPut)
{
return { response_put: this._put(req.request_put || req.requestPut, cur_revision, notifications) };
}
else if (req.request_delete_range || req.requestDeleteRange)
{
return { response_delete_range: this._delete_range(req.request_delete_range || req.requestDeleteRange, cur_revision, notifications) };
}
return {};
}
_range(request_range)
{
// FIXME: limit, revision(-), sort_order, sort_target, serializable(-),
// count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision
const { parts, all } = this._get_range(request_range);
const { cur } = this._get_subtree(parts, false, false);
const kvs = [];
if (cur)
{
this._get_all(kvs, cur, all, parts.join('/') || null, request_range);
}
return { kvs };
}
_put(request_put, cur_revision, notifications)
{
// FIXME: prev_kv, ignore_value(?), ignore_lease(?)
const parts = this._key_parts(this.de64(request_put.key));
const key = parts.join('/');
const value = this.de64(request_put.value);
const { cur, watchers } = this._get_subtree(parts, true, true);
if (cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
if (!eq(cur.value, value) || cur.lease != request_put.lease)
{
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[key];
}
if (request_put.lease)
{
if (!this.leases[request_put.lease])
{
throw new Error('unknown lease: '+request_put.lease);
}
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;
}
else if (cur.lease)
{
cur.lease = null;
}
this.mod_revision = cur_revision;
cur.version = (cur.version||0) + 1;
cur.mod_revision = cur_revision;
if (cur.value == null)
{
cur.create_revision = cur_revision;
}
cur.value = value;
const notify = { watchers, key: this.b64(key), value: this.b64(value), mod_revision: cur.mod_revision };
if (cur.lease)
{
notify.lease = cur.lease;
}
notifications.push(notify);
}
return {};
}
_delete_range(request_delete_range, cur_revision, notifications)
{
// FIXME: prev_kv
const { parts, all } = this._get_range(request_delete_range);
const { cur, watchers } = this._get_subtree(parts, false, true);
const prevcount = notifications.length;
if (cur)
{
this._delete_all(notifications, watchers, cur, all, parts.join('/') || null, cur_revision);
}
return { deleted: notifications.length-prevcount };
}
_get_all(kvs, cur, all, prefix, req)
{
if (req.limit && kvs.length > req.limit)
{
return;
}
if (cur.value != null)
{
const item = { key: this.b64(prefix === null ? '' : prefix) };
if (!req.keys_only)
{
item.value = this.b64(cur.value);
item.mod_revision = cur.mod_revision;
//item.create_revision = cur.create_revision;
//item.version = cur.version;
if (cur.lease)
{
item.lease = cur.lease;
}
}
kvs.push(item);
}
if (all && cur.children)
{
for (let k in cur.children)
{
this._get_all(kvs, cur.children[k], true, prefix === null ? k : prefix+'/'+k, req);
}
}
}
_delete_all(notifications, watchers, cur, all, prefix, cur_revision)
{
if (cur.value != null)
{
// Do not actually forget the key until the deletion is confirmed by all replicas
// ...and until it's not required by watchers
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[prefix === null ? '' : prefix];
}
cur.value = null;
cur.version = 0;
cur.create_revision = null;
cur.mod_revision = cur_revision;
this.mod_revision = cur_revision;
notifications.push({
watchers: cur.key_watchers ? [ ...watchers, ...cur.key_watchers ] : watchers,
key: this.b64(prefix === null ? '' : prefix),
mod_revision: cur_revision,
});
}
if (all && cur.children)
{
for (let k in cur.children)
{
const subw = cur.children[k].watchers ? [ ...watchers, ...cur.children[k].watchers ] : watchers;
this._delete_all(notifications, subw, cur.children[k], true, prefix === null ? k : prefix+'/'+k, cur_revision);
}
}
}
}
function eq(a, b)
{
if (a instanceof Object || b instanceof Object)
{
return stableStringify(a) === stableStringify(b);
}
return a == b;
}
function emptyObj(obj)
{
if (!obj)
{
return true;
}
for (const k in obj)
{
return false;
}
return true;
}
EtcTree.eq = eq;
module.exports = EtcTree;