const crypto = require('crypto'); /*type TreeNode = { value?: any, create_revision?: number, mod_revision?: number, version?: number, lease?: string, children: { [string]: TreeNode }, watchers?: number[], key_watchers?: number[], };*/ class EtcTree { constructor() { this.state = {}; this.leases = {}; this.watchers = {}; this.watcher_id = 0; this.mod_revision = 0; } check(chk) { const parts = this.key_parts(chk.key); const { cur } = this.get_subtree(parts, false, false); let check_value, ref_value; if (chk.target === 'MOD') { check_value = cur.mod_revision || 0; ref_value = chk.mod_revision || 0; } else if (chk.target === 'CREATE') { check_value = cur.create_revision || 0; ref_value = chk.create_revision || 0; } else if (chk.target === 'VERSION') { check_value = cur.version || 0; ref_value = chk.version || 0; } else if (chk.target === 'LEASE') { check_value = cur.lease; ref_value = chk.lease; } else { throw new Error('Unsupported comparison target: '+chk.target); } if (chk.result === 'LESS') { return check_value < ref_value; } else if (chk.result) { throw new Error('Unsupported comparison result: '+chk.result); } return check_value == ref_value; } key_parts(key) { const parts = key.replace(/\/\/+/g, '/').replace(/^\/|\/$/g, ''); return parts === '' ? [] : parts.split('/'); } get_range(req) { const key = req.key; const end = req.range_end; if (end != null && (key[key.length-1] != '/' || end[end.length-1] != '0' || end.substr(0, end.length-1) !== key.substr(0, key.length-1))) { throw new Error('Non-directory range queries are unsupported'); } const parts = this.key_parts(key); return { parts, all: end != null }; } get_subtree(parts, create, notify) { let cur = this.state; let watchers = notify ? [] : null; for (let k of parts) { if (notify && cur.watchers) { watchers.push.apply(watchers, cur.watchers); } if (!cur.children) { if (!create) { return {}; } cur.children = {}; } if (!cur.children[k]) { if (!create) { return {}; } cur.children[k] = {}; } cur = cur.children[k]; } if (notify && cur.watchers) { watchers.push.apply(watchers, cur.watchers); } if (notify && cur.key_watchers) { watchers.push.apply(watchers, cur.key_watchers); } return { watchers, cur }; } api_grant_lease(req) { let id; while (!id || this.leases[id]) { id = crypto.randomBytes(8).toString('base64'); } const timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), req.TTL*1000); this.leases[id] = { ttl: req.TTL, timer_id, keys: {} }; return { ID: id }; } api_keepalive_lease(req) { const id = req.ID; if (!this.leases[id]) { throw new Error('unknown lease'); } clearTimeout(this.leases[id].timer_id); const ttl = this.leases[id].TTL; this.leases[id].timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), ttl*1000); return { TTL: ttl }; } api_revoke_lease(req) { if (!this.leases[req.ID]) { throw new Error('unknown lease'); } const next_revision = this.mod_revision + 1; const notifications = []; for (const key in this.leases[req.ID].keys) { this.txn_action({ request_delete_range: { key } }, next_revision, notifications); } this.notify(notifications); } api_create_watch(req, send) { const { parts, all } = this.get_range(req); if (req.start_revision && req.start_revision < this.mod_revision) { throw new Error('history storage is not implemented'); } let watch_id = req.watch_id; if (watch_id instanceof Object) { throw new Error('invalid watch_id'); } if (!watch_id) { watch_id = ++this.watcher_id; } if (!this.watchers[watch_id]) { this.watchers[watch_id] = { paths: [], send, }; } this.watchers[watch_id].paths.push(parts); const { cur } = this.get_subtree(parts, true, false); if (all) { cur.watchers = cur.watchers || []; cur.watchers.push(watch_id); } else { cur.key_watchers = cur.key_watchers || []; cur.key_watchers.push(watch_id); } return { watch_id, created: true }; } api_cancel_watch(watch_id) { if (this.watchers[watch_id]) { for (const parts of this.watchers[watch_id].paths) { const { cur } = this.get_subtree(parts, false, false); if (cur) { if (cur.watchers) { cur.watchers = cur.watchers.filter(id => id != watch_id); if (!cur.watchers.length) cur.watchers = null; } if (cur.key_watchers) { cur.key_watchers = cur.key_watchers.filter(id => id != watch_id); if (!cur.key_watchers.length) cur.key_watchers = null; } // FIXME: cleanup deleted tree paths } } delete this.watchers[watch_id]; } } api_txn({ compare, success, failure }) { const failed = (compare || []).filter(chk => !this.check(chk)).length > 0; const responses = []; const notifications = []; const next_revision = this.mod_revision + 1; for (const req of (failed ? failure : success) || []) { responses.push(this.txn_action(req, next_revision, notifications)); } this.notify(notifications); return { revision: this.mod_revision, succeeded: !failed, responses }; } notify(notifications) { if (!notifications.length) { return; } const by_watcher = {}; for (const notif of notifications) { const watchers = notif.watchers; delete notif.watchers; for (const wid of watchers) { if (this.watchers[wid]) { by_watcher[wid] = by_watcher[wid] || { header: { revision: this.mod_revision }, events: {} }; by_watcher[wid].events[notif.key] = notif; } } } for (const wid in by_watcher) { by_watcher[wid].events = Object.values(by_watcher[wid].events); this.watchers[wid].send(by_watcher[wid]); } } txn_action(req, cur_revision, notifications) { if (req.request_range) { // FIXME: limit, revision(-), sort_order, sort_target, serializable(-), keys_only, // count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision const { parts, all } = this.get_range(req.request_range); const { cur } = this.get_subtree(parts, false, false); const kvs = []; if (cur) { this.get_all(kvs, cur, all, parts.join('/'), req.request_range); } return { kvs }; } else if (req.request_put) { // FIXME: prev_kv, ignore_value(?), ignore_lease(?) const parts = this.key_parts(req.request_put.key); const key = parts.join('/'); const value = req.request_put.value; const { cur, watchers } = this.get_subtree(parts, true, true); if (!eq(cur.value, value) || cur.lease != req.request_put.lease) { if (cur.lease && this.leases[cur.lease]) { delete this.leases[cur.lease].keys[key]; } if (req.request_put.lease) { if (!this.leases[req.request_put.lease]) { throw new Error('unknown lease: '+req.request_put.lease); } cur.lease = req.request_put.lease; this.leases[req.request_put.lease].keys[key] = true; } this.mod_revision = cur_revision; cur.version = (cur.version||0) + 1; cur.mod_revision = cur_revision; if (cur.value == null) { cur.create_revision = cur_revision; } cur.value = value; const notify = { watchers, key, value, mod_revision: cur.mod_revision }; if (cur.lease) { notify.lease = cur.lease; } notifications.push(notify); } return {}; } else if (req.request_delete_range) { // FIXME: prev_kv const { parts, all } = this.get_range(req.request_delete_range); const { cur, watchers } = this.get_subtree(parts, false, true); if (cur) { this.delete_all(notifications, watchers, cur, all, parts.join('/'), cur_revision); } } } get_all(kvs, cur, all, prefix, req) { if (req.limit && kvs.length > req.limit) { return; } if (cur.value != null) { if (req.keys_only) { kvs.push({ key: prefix, mod_revision: cur.mod_revision }); } else { kvs.push({ key: prefix, value: cur.value, mod_revision: cur.mod_revision }); } } if (all && cur.children) { for (let k in cur.children) { this.get_all(kvs, cur.children[k], true, prefix === '' ? k : prefix+'/'+k, req); } } } delete_all(notifications, watchers, cur, all, prefix, cur_revision) { if (cur.value != null) { // Do not actually forget the key until the deletion is confirmed by all replicas cur.value = null; cur.create_revision = null; cur.mod_revision = cur_revision; this.mod_revision = cur_revision; notifications.push({ watchers, key: prefix, mod_revision: cur_revision }); } if (all && cur.children) { for (let k in cur.children) { const subw = cur.children[k].watchers ? [ ...watchers, ...cur.children[k].watchers ] : watchers; this.delete_all(notifications, subw, cur.children[k], true, prefix === '' ? k : prefix+'/'+k, cur_revision); } } } } function eq(a, b) { if (a instanceof Object || b instanceof Object) { return JSON.stringify(a) === JSON.stringify(b); } return a == b; } module.exports = EtcTree;