From 899c06faed91c74514e0bf8cfe05ecd9d69a0df5 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Sat, 8 Jul 2023 02:05:23 +0300 Subject: [PATCH] I want to make mini-etcd out of it --- etctree.js | 393 ++++++++++++++++++++++++++++++++++++++++++++++++ etctree.spec.js | 59 ++++++++ 2 files changed, 452 insertions(+) create mode 100644 etctree.js create mode 100644 etctree.spec.js diff --git a/etctree.js b/etctree.js new file mode 100644 index 0000000..ea141a0 --- /dev/null +++ b/etctree.js @@ -0,0 +1,393 @@ +const crypto = require('crypto'); + +/*type TreeNode = { + value?: any, + create_revision?: number, + mod_revision?: number, + version?: number, + lease?: string, + children: { [string]: TreeNode }, + watchers?: number[], + key_watchers?: number[], +};*/ + +class EtcTree +{ + constructor() + { + this.state = {}; + this.leases = {}; + this.watchers = {}; + this.watcher_id = 0; + this.mod_revision = 0; + } + + check(chk) + { + const parts = this.key_parts(chk.key); + const { cur } = this.get_subtree(parts, false, false); + let check_value, ref_value; + if (chk.target === 'MOD') + { + check_value = cur.mod_revision || 0; + ref_value = chk.mod_revision || 0; + } + else if (chk.target === 'CREATE') + { + check_value = cur.create_revision || 0; + ref_value = chk.create_revision || 0; + } + else if (chk.target === 'VERSION') + { + check_value = cur.version || 0; + ref_value = chk.version || 0; + } + else if (chk.target === 'LEASE') + { + check_value = cur.lease; + ref_value = chk.lease; + } + else + { + throw new Error('Unsupported comparison target: '+chk.target); + } + if (chk.result === 'LESS') + { + return check_value < ref_value; + } + else if (chk.result) + { + throw new Error('Unsupported comparison result: '+chk.result); + } + return check_value == ref_value; + } + + key_parts(key) + { + const parts = key.replace(/\/\/+/g, '/').replace(/^\/|\/$/g, ''); + return parts === '' ? [] : parts.split('/'); + } + + get_range(req) + { + const key = req.key; + const end = req.range_end; + if (end != null && (key[key.length-1] != '/' || end[end.length-1] != '0' || + end.substr(0, end.length-1) !== key.substr(0, key.length-1))) + { + throw new Error('Non-directory range queries are unsupported'); + } + const parts = this.key_parts(key); + return { parts, all: end != null }; + } + + get_subtree(parts, create, notify) + { + let cur = this.state; + let watchers = notify ? [] : null; + for (let k of parts) + { + if (notify && cur.watchers) + { + watchers.push.apply(watchers, cur.watchers); + } + if (!cur.children) + { + if (!create) + { + return {}; + } + cur.children = {}; + } + if (!cur.children[k]) + { + if (!create) + { + return {}; + } + cur.children[k] = {}; + } + cur = cur.children[k]; + } + if (notify && cur.watchers) + { + watchers.push.apply(watchers, cur.watchers); + } + if (notify && cur.key_watchers) + { + watchers.push.apply(watchers, cur.key_watchers); + } + return { watchers, cur }; + } + + api_grant_lease(req) + { + let id; + while (!id || this.leases[id]) + { + id = crypto.randomBytes(8).toString('base64'); + } + const timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), req.TTL*1000); + this.leases[id] = { ttl: req.TTL, timer_id, keys: {} }; + return { ID: id }; + } + + api_keepalive_lease(req) + { + const id = req.ID; + if (!this.leases[id]) + { + throw new Error('unknown lease'); + } + clearTimeout(this.leases[id].timer_id); + const ttl = this.leases[id].TTL; + this.leases[id].timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), ttl*1000); + return { TTL: ttl }; + } + + api_revoke_lease(req) + { + if (!this.leases[req.ID]) + { + throw new Error('unknown lease'); + } + const next_revision = this.mod_revision + 1; + const notifications = []; + for (const key in this.leases[req.ID].keys) + { + this.txn_action({ request_delete_range: { key } }, next_revision, notifications); + } + this.notify(notifications); + } + + api_create_watch(req, send) + { + const { parts, all } = this.get_range(req); + if (req.start_revision && req.start_revision < this.mod_revision) + { + throw new Error('history storage is not implemented'); + } + let watch_id = req.watch_id; + if (watch_id instanceof Object) + { + throw new Error('invalid watch_id'); + } + if (!watch_id) + { + watch_id = ++this.watcher_id; + } + if (!this.watchers[watch_id]) + { + this.watchers[watch_id] = { + paths: [], + send, + }; + } + this.watchers[watch_id].paths.push(parts); + const { cur } = this.get_subtree(parts, true, false); + if (all) + { + cur.watchers = cur.watchers || []; + cur.watchers.push(watch_id); + } + else + { + cur.key_watchers = cur.key_watchers || []; + cur.key_watchers.push(watch_id); + } + return { watch_id, created: true }; + } + + api_cancel_watch(watch_id) + { + if (this.watchers[watch_id]) + { + for (const parts of this.watchers[watch_id].paths) + { + const { cur } = this.get_subtree(parts, false, false); + if (cur) + { + if (cur.watchers) + { + cur.watchers = cur.watchers.filter(id => id != watch_id); + if (!cur.watchers.length) + cur.watchers = null; + } + if (cur.key_watchers) + { + cur.key_watchers = cur.key_watchers.filter(id => id != watch_id); + if (!cur.key_watchers.length) + cur.key_watchers = null; + } + // FIXME: cleanup deleted tree paths + } + } + delete this.watchers[watch_id]; + } + } + + api_txn({ compare, success, failure }) + { + const failed = (compare || []).filter(chk => !this.check(chk)).length > 0; + const responses = []; + const notifications = []; + const next_revision = this.mod_revision + 1; + for (const req of (failed ? failure : success) || []) + { + responses.push(this.txn_action(req, next_revision, notifications)); + } + this.notify(notifications); + return { revision: this.mod_revision, succeeded: !failed, responses }; + } + + notify(notifications) + { + if (!notifications.length) + { + return; + } + const by_watcher = {}; + for (const notif of notifications) + { + const watchers = notif.watchers; + delete notif.watchers; + for (const wid of watchers) + { + if (this.watchers[wid]) + { + by_watcher[wid] = by_watcher[wid] || { header: { revision: this.mod_revision }, events: {} }; + by_watcher[wid].events[notif.key] = notif; + } + } + } + for (const wid in by_watcher) + { + by_watcher[wid].events = Object.values(by_watcher[wid].events); + this.watchers[wid].send(by_watcher[wid]); + } + } + + txn_action(req, cur_revision, notifications) + { + if (req.request_range) + { + // FIXME: limit, revision(-), sort_order, sort_target, serializable(-), keys_only, + // count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision + const { parts, all } = this.get_range(req.request_range); + const { cur } = this.get_subtree(parts, false, false); + const kvs = []; + if (cur) + { + this.get_all(kvs, cur, all, parts.join('/'), req.request_range); + } + return { kvs }; + } + else if (req.request_put) + { + // FIXME: prev_kv, ignore_value(?), ignore_lease(?) + const parts = this.key_parts(req.request_put.key); + const key = parts.join('/'); + const value = req.request_put.value; + const { cur, watchers } = this.get_subtree(parts, true, true); + if (!eq(cur.value, value) || cur.lease != req.request_put.lease) + { + if (cur.lease && this.leases[cur.lease]) + { + delete this.leases[cur.lease].keys[key]; + } + if (req.request_put.lease) + { + if (!this.leases[req.request_put.lease]) + { + throw new Error('unknown lease: '+req.request_put.lease); + } + cur.lease = req.request_put.lease; + this.leases[req.request_put.lease].keys[key] = true; + } + this.mod_revision = cur_revision; + cur.version = (cur.version||0) + 1; + cur.mod_revision = cur_revision; + if (cur.value == null) + { + cur.create_revision = cur_revision; + } + cur.value = value; + const notify = { watchers, key, value, mod_revision: cur.mod_revision }; + if (cur.lease) + { + notify.lease = cur.lease; + } + notifications.push(notify); + } + return {}; + } + else if (req.request_delete_range) + { + // FIXME: prev_kv + const { parts, all } = this.get_range(req.request_delete_range); + const { cur, watchers } = this.get_subtree(parts, false, true); + if (cur) + { + this.delete_all(notifications, watchers, cur, all, parts.join('/'), cur_revision); + } + } + } + + get_all(kvs, cur, all, prefix, req) + { + if (req.limit && kvs.length > req.limit) + { + return; + } + if (cur.value != null) + { + if (req.keys_only) + { + kvs.push({ key: prefix, mod_revision: cur.mod_revision }); + } + else + { + kvs.push({ key: prefix, value: cur.value, mod_revision: cur.mod_revision }); + } + } + if (all && cur.children) + { + for (let k in cur.children) + { + this.get_all(kvs, cur.children[k], true, prefix === '' ? k : prefix+'/'+k, req); + } + } + } + + delete_all(notifications, watchers, cur, all, prefix, cur_revision) + { + if (cur.value != null) + { + // Do not actually forget the key until the deletion is confirmed by all replicas + cur.value = null; + cur.create_revision = null; + cur.mod_revision = cur_revision; + this.mod_revision = cur_revision; + notifications.push({ watchers, key: prefix, mod_revision: cur_revision }); + } + if (all && cur.children) + { + for (let k in cur.children) + { + const subw = cur.children[k].watchers ? [ ...watchers, ...cur.children[k].watchers ] : watchers; + this.delete_all(notifications, subw, cur.children[k], true, prefix === '' ? k : prefix+'/'+k, cur_revision); + } + } + } +} + +function eq(a, b) +{ + if (a instanceof Object || b instanceof Object) + { + return JSON.stringify(a) === JSON.stringify(b); + } + return a == b; +} + +module.exports = EtcTree; diff --git a/etctree.spec.js b/etctree.spec.js new file mode 100644 index 0000000..659440d --- /dev/null +++ b/etctree.spec.js @@ -0,0 +1,59 @@ +const EtcTree = require('./etctree.js'); + +describe('EtcTree', () => +{ + it('should return the inserted value', () => + { + const t = new EtcTree(); + expect(t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] })) + .toEqual({ succeeded: true, revision: 1, responses: [ {} ] }); + expect(t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] })) + .toEqual({ succeeded: true, revision: 1, responses: [ { kvs: [ { key: 'vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ] } ] }); + expect(t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] })) + .toEqual({ succeeded: true, revision: 1, responses: [ { kvs: [ { key: 'vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ] } ] }); + expect(t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] })) + .toEqual({ succeeded: true, revision: 1, responses: [ { kvs: [] } ] }); + expect(t.api_txn({ + compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 1, result: 'LESS' } ], + success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ], + failure: [ { request_range: { key: 'vitastor/config/global' } } ], + })).toEqual({ succeeded: false, revision: 1, responses: [ { kvs: [ { key: 'vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ] } ] }); + expect(t.api_txn({ + compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 2, result: 'LESS' } ], + success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world2' } } } ] + })).toEqual({ succeeded: true, revision: 2, responses: [ {} ] }); + expect(t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] })) + .toEqual({ succeeded: true, revision: 2, responses: [ { kvs: [ { key: 'vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ] } ] }); + }); + + it('should watch', () => + { + const t = new EtcTree(); + const sent = []; + const send = (event) => sent.push(event); + expect(t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] })) + .toEqual({ succeeded: true, revision: 1, responses: [ {} ] }); + expect(t.api_create_watch({ watch_id: 1, key: '/vitastor/', range_end: '/vitastor0' }, send)) + .toEqual({ watch_id: 1, created: true }); + expect(sent).toEqual([]); + expect(t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] })) + .toEqual({ succeeded: true, revision: 2, responses: [ {} ] }); + expect(sent).toEqual([ { header: { revision: 2 }, events: [ { key: 'vitastor/osd/state/1', mod_revision: 2, value: { ip: '1.2.3.4' } } ] } ]); + }); + + it('should lease', async () => + { + const t = new EtcTree(); + const sent = []; + const send = (event) => sent.push(event); + const leaseID = t.api_grant_lease({ TTL: 0.5 }).ID; + expect(leaseID).not.toBeNull(); + expect(t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] })) + .toEqual({ succeeded: true, revision: 1, responses: [ {} ] }); + expect(t.api_create_watch({ watch_id: 1, key: '/vitastor/', range_end: '/vitastor0' }, send)) + .toEqual({ watch_id: 1, created: true }); + expect(sent).toEqual([]); + await new Promise(ok => setTimeout(ok, 600)); + expect(sent).toEqual([ { header: { revision: 2 }, events: [ { key: 'vitastor/osd/state/1', mod_revision: 2 } ] } ]); + }); +});