diff --git a/etctree.js b/etctree.js index b0cd740..f6a98e8 100644 --- a/etctree.js +++ b/etctree.js @@ -22,6 +22,8 @@ class EtcTree this.watcher_id = 0; this.mod_revision = 0; this.use_base64 = use_base64; + this.paused = false; + this.on_expire_lease = null; } de64(k) @@ -129,13 +131,197 @@ class EtcTree { watchers.push.apply(watchers, cur.watchers); } - if (notify && cur.key_watchers) - { - watchers.push.apply(watchers, cur.key_watchers); - } return { watchers, cur }; } + // create a snapshot of all data including leases + dump(persistent_only) + { + const snapshot = { + state: this._copy_tree(this.state, persistent_only) || {}, + mod_revision: this.mod_revision, + }; + if (!persistent_only) + { + snapshot.leases = {}; + for (const id in this.leases) + { + const lease = this.leases[id]; + snapshot.leases[id] = { ttl: lease.ttl, expires: lease.expires }; + } + } + return snapshot; + } + + _copy_tree(cur, no_lease) + { + const nonempty = cur.value != null && (!no_lease || !copy.lease); + const copy = (nonempty ? { ...cur } : {}); + copy.children = {}; + delete copy.watchers; + delete copy.key_watchers; + let has_children = false; + for (const k in cur.children) + { + const child = this._copy_tree(cur.children[k]); + if (child) + { + copy.children[k] = child; + has_children = true; + } + } + if (!nonempty && !has_children) + { + return null; + } + if (!has_children) + { + delete copy.children; + } + return copy; + } + + // load snapshot of all data including leases + load(snapshot) + { + this.mod_revision = snapshot.mod_revision; + // First apply leases + for (const id in this.leases) + { + if (!snapshot.leases[id]) + { + this.api_revoke_lease(id); + } + } + for (const id in snapshot.leases) + { + if (!this.leases[id]) + { + this.leases[id] = { ...snapshot.leases[id], timer_id: null, keys: {} }; + } + else if (this.leases[id].ttl != snapshot.leases[id].ttl || + this.leases[id].expires != snapshot.leases[id].expires) + { + this.leases[id].ttl = snapshot.leases[id].ttl; + this.leases[id].expires = snapshot.leases[id].expires; + } + else + { + continue; + } + if (this.leases[id].timer_id) + { + clearTimeout(this.leases[id].timer_id); + this.leases[id].timer_id = null; + } + if (!this.paused) + { + this.leases[id].timer_id = setTimeout(() => this.expire_lease(id), this.leases[id].expires - Date.now()); + } + } + // Then find and apply the difference in data + const notifications = []; + this._restore_diff(this.state, snapshot.state, null, this.state.watchers || [], notifications); + this.notify(notifications); + } + + _restore_diff(cur_old, cur_new, prefix, watchers, notifications) + { + const key = prefix === null ? '' : prefix; + if (!eq(cur_old.lease, cur_new.lease)) + { + if (cur_old.lease && this.leases[cur_old.lease]) + { + delete this.leases[cur_old.lease].keys[key]; + } + cur_old.lease = cur_new.lease; + if (cur_new.lease && this.leases[cur_new.lease]) + { + this.leases[cur_new.lease].keys[key] = true; + } + } + cur_old.mod_revision = cur_new.mod_revision; + cur_old.create_revision = cur_new.create_revision; + cur_old.version = cur_new.version; + if (!eq(cur_old.value, cur_new.value)) + { + cur_old.value = cur_new.value; + for (const w of (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers)) + { + const notify = { watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision }; + if (cur_old.lease) + { + notify.lease = cur_old.lease; + } + notifications.push(notify); + } + } + cur_old.children ||= {}; + for (const k in cur_new.children) + { + if (!cur_old.children[k]) + { + cur_old.children[k] = cur_new.children[k]; + } + else + { + this._restore_diff( + cur_old.children[k], cur_new.children[k], + prefix === null ? k : prefix+'/'+k, + cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers, + notifications + ); + } + } + for (const k in cur_old.children) + { + if (!cur_new.children[k]) + { + // Delete subtree + this.delete_all( + notifications, + cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers, + cur_old.children[k], true, + prefix === null ? k : prefix+'/'+k, + this.mod_revision + ); + } + } + } + + // slave/follower nodes don't expire leases themselves, they listen for the leader instead + pause_leases() + { + this.paused = true; + for (const id in this.leases) + { + const lease = this.leases[id]; + if (lease.timer_id) + { + clearTimeout(lease.timer_id); + lease.timer_id = null; + } + } + } + + resume_leases() + { + this.paused = false; + for (const id in this.leases) + { + const lease = this.leases[id]; + if (!lease.timer_id) + { + lease.timer_id = setTimeout(() => this.expire_lease(id), lease.expires - Date.now()); + } + } + } + + set_on_expire_lease(cb) + { + this.on_expire_lease = cb; + } + api_grant_lease(req) { let id; @@ -143,8 +329,9 @@ class EtcTree { id = crypto.randomBytes(8).toString('hex'); } - const timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), req.TTL*1000); - this.leases[id] = { ttl: req.TTL, timer_id, keys: {} }; + const expires = Date.now() + req.TTL*1000; + const timer_id = this.paused ? null : setTimeout(() => this.expire_lease(id), req.TTL*1000); + this.leases[id] = { ttl: req.TTL, expires, timer_id, keys: {} }; return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL }; } @@ -155,13 +342,31 @@ class EtcTree { throw new Error('unknown lease'); } - clearTimeout(this.leases[id].timer_id); + const lease = this.leases[id]; + if (lease.timer_id) + { + clearTimeout(lease.timer_id); + lease.timer_id = null; + } const ttl = this.leases[id].ttl; - this.leases[id].timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), ttl*1000); + lease.expires = Date.now() + ttl*1000; + if (!this.paused) + { + lease.timer_id = setTimeout(() => this.expire_lease(id), ttl*1000); + } // extra wrapping in { result: ... } return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } }; } + expire_lease(id) + { + this.api_revoke_lease({ ID: id }) + if (this.on_expire_lease) + { + this.on_expire_lease(id); + } + } + api_revoke_lease(req) { if (!this.leases[req.ID]) @@ -344,6 +549,10 @@ class EtcTree const key = parts.join('/'); const value = this.de64(request_put.value); const { cur, watchers } = this.get_subtree(parts, true, true); + if (cur.key_watchers) + { + watchers.push.apply(watchers, cur.key_watchers); + } if (!eq(cur.value, value) || cur.lease != request_put.lease) { if (cur.lease && this.leases[cur.lease]) @@ -359,6 +568,10 @@ class EtcTree cur.lease = request_put.lease; this.leases[request_put.lease].keys[key] = true; } + else if (cur.lease) + { + cur.lease = null; + } this.mod_revision = cur_revision; cur.version = (cur.version||0) + 1; cur.mod_revision = cur_revision; @@ -400,14 +613,19 @@ class EtcTree } if (cur.value != null) { - if (req.keys_only) + const item = { key: this.b64(prefix === null ? '' : prefix) }; + if (!req.keys_only) { - kvs.push({ key: this.b64(prefix === null ? '' : prefix), mod_revision: cur.mod_revision }); - } - else - { - kvs.push({ key: this.b64(prefix === null ? '' : prefix), value: this.b64(cur.value), mod_revision: cur.mod_revision }); + item.value = this.b64(cur.value); + item.mod_revision = cur.mod_revision; + //item.create_revision = cur.create_revision; + //item.version = cur.version; + if (cur.lease) + { + item.lease = cur.lease; + } } + kvs.push(item); } if (all && cur.children) { @@ -424,12 +642,20 @@ class EtcTree { // Do not actually forget the key until the deletion is confirmed by all replicas // ...and until it's not required by watchers + if (cur.lease && this.leases[cur.lease]) + { + delete this.leases[cur.lease].keys[prefix === null ? '' : prefix]; + } cur.value = null; cur.version = 0; cur.create_revision = null; cur.mod_revision = cur_revision; this.mod_revision = cur_revision; - notifications.push({ watchers, key: this.b64(prefix === null ? '' : prefix), mod_revision: cur_revision }); + notifications.push({ + watchers: cur.key_watchers ? [ ...watchers, ...cur.key_watchers ] : watchers, + key: this.b64(prefix === null ? '' : prefix), + mod_revision: cur_revision, + }); } if (all && cur.children) { diff --git a/etctree.spec.js b/etctree.spec.js index 4312434..6a40c1d 100644 --- a/etctree.spec.js +++ b/etctree.spec.js @@ -13,7 +13,7 @@ const expect = (a, b) => } }; -tests['read/write'] = () => +tests['read/write'] = async () => { const t = new EtcTree(); expect( @@ -59,9 +59,13 @@ tests['read/write'] = () => kvs: [ { key: '/vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ], } } ] } ); + expect( + t.dump(false), + {"state":{"children":{"":{"children":{"vitastor":{"children":{"config":{"children":{"global":{"version":2,"mod_revision":2,"create_revision":1,"value":{"hello":"world2"}}}}}}}}}},"mod_revision":2,"leases":{}} + ); }; -tests['watch'] = () => +tests['watch'] = async () => { const t = new EtcTree(); const sent = []; @@ -98,12 +102,25 @@ tests['lease'] = async () => { watch_id: 1, created: true } ); expect(sent, []); + const dump = t.dump(false); + const expires = dump.leases[leaseID].expires; + expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":1,"mod_revision":1,"create_revision":1,"value":{"ip":"1.2.3.4"}}}}}}}}}}}},"mod_revision":1,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}}); await new Promise(ok => setTimeout(ok, 600)); expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'DELETE', kv: { key: '/vitastor/osd/state/1', mod_revision: 2 } } ] } } ]); + t.pause_leases(); + t.load(dump); + expect(t.dump(false), dump); + const t2 = new EtcTree(); + t2.pause_leases(); + t2.load(dump); + expect(t2.dump(false), dump); }; -for (cur_test in tests) +(async function() { - tests[cur_test](); - console.log(cur_test+' test: OK'); -} + for (cur_test in tests) + { + await tests[cur_test](); + console.log(cur_test+' test: OK'); + } +})().catch(console.error);