diff --git a/etctree.js b/etctree.js index 0bc3b1f..f04dbb6 100644 --- a/etctree.js +++ b/etctree.js @@ -22,7 +22,27 @@ class EtcTree this.watcher_id = 0; this.mod_revision = 0; this.use_base64 = use_base64; + this.replicate = null; this.paused = false; + this.active_immediate = []; + } + + destroy() + { + this.pause_leases(); + for (const imm of this.active_immediate) + { + clearImmediate(imm); + } + } + + set_replicate_watcher(replicate) + { + // Replication watcher is special: + // It should be an async function and it is called BEFORE notifying all + // other watchers about any change. + // It may also throw to prevent notifying at all if replication fails. + this.replicate = replicate; } de64(k) @@ -326,12 +346,12 @@ class EtcTree const lease = this.leases[id]; if (!lease.timer_id) { - lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), lease.expires - Date.now()); + lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }).catch(console.error), lease.expires - Date.now()); } } } - api_grant_lease(req) + async api_grant_lease(req) { let id; while (!id || this.leases[id]) @@ -341,10 +361,14 @@ class EtcTree const expires = Date.now() + req.TTL*1000; this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} }; this._set_expire(id); + if (this.replicate) + { + await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl: req.TTL, expires } ] }); + } return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL }; } - api_keepalive_lease(req) + async api_keepalive_lease(req) { const id = req.ID; if (!this.leases[id]) @@ -360,6 +384,10 @@ class EtcTree const ttl = this.leases[id].ttl; lease.expires = Date.now() + ttl*1000; this._set_expire(id); + if (this.replicate) + { + await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl, expires: lease.expires } ] }); + } // extra wrapping in { result: ... } return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } }; } @@ -404,18 +432,74 @@ class EtcTree delete this.leases[id]; } - api_revoke_lease(req) + async api_revoke_lease(req, no_throw) { const notifications = []; if (!this.leases[req.ID]) { + if (no_throw) + return null; throw new Error('unknown lease'); } this._sync_revoke_lease(req.ID, notifications); + if (this.replicate) + { + await this.notify_replicator(notifications, [ { id: req.ID } ]); + } this._notify(notifications); return { header: { revision: this.mod_revision } }; } + async notify_replicator(notifications, leases) + { + // First replicate the change and then notify watchers about it + const all_changes = {}; + for (const chg of notifications) + { + all_changes[chg.key] = { ...chg }; + delete all_changes[chg.key].watchers; + } + await this.replicate({ header: { revision: this.mod_revision }, events: Object.values(all_changes), leases }); + } + + async apply_replication(msg) + { + const notifications = []; + if ((msg.leases||[]).length) + { + for (const lease of msg.leases) + { + if (lease.ttl) + { + this.load_lease(lease); + } + else + { + this._sync_revoke_lease(lease.id, notifications); + } + } + } + if ((msg.events||[]).length) + { + for (const ev of msg.events) + { + if (ev.value == null) + { + this._delete_range({ key: ev.key }, ev.mod_revision, notifications); + } + else + { + this._put({ key: ev.key, value: ev.value }, ev.mod_revision, notifications); + } + } + } + if (this.replicate) + { + await this.notify_replicator(notifications, msg.leases); + } + this._notify(notifications); + } + api_create_watch(req, send) { const { parts, all } = this._get_range(req); @@ -455,13 +539,15 @@ class EtcTree if (req.start_revision && req.start_revision < this.mod_revision) { // Send initial changes - setImmediate(() => + const imm = setImmediate(() => { + this.active_immediate = this.active_immediate.filter(i => i !== imm); const events = []; const { cur } = this._get_subtree([], false, false); this._get_modified(events, cur, null, req.start_revision); send({ result: { header: { revision: this.mod_revision }, events } }); }); + this.active_immediate.push(imm); } return { watch_id, created: true }; } @@ -550,7 +636,7 @@ class EtcTree } } - api_txn({ compare, success, failure }) + async api_txn({ compare, success, failure }) { const failed = (compare || []).filter(chk => !this._check(chk)).length > 0; const responses = []; @@ -560,6 +646,11 @@ class EtcTree { responses.push(this._txn_action(req, next_revision, notifications)); } + if (this.replicate && notifications.length) + { + // First replicate the change and then notify watchers about it + await this.notify_replicator(notifications); + } this._notify(notifications); return { header: { revision: this.mod_revision }, succeeded: !failed, responses }; } diff --git a/etctree.spec.js b/etctree.spec.js index 6a40c1d..9a73888 100644 --- a/etctree.spec.js +++ b/etctree.spec.js @@ -17,27 +17,27 @@ tests['read/write'] = async () => { const t = new EtcTree(); expect( - t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }), + await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }), { header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] } ); expect( - t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] }), + await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] }), { header: { revision: 1 }, succeeded: true, responses: [ { response_range: { kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ], } } ] } ); expect( - t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }), + await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }), { header: { revision: 1 }, succeeded: true, responses: [ { response_range: { kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ], } } ] } ); expect( - t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] }), + await t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] }), { header: { revision: 1 }, succeeded: true, responses: [ { response_range: { kvs: [] } } ] } ); expect( - t.api_txn({ + await t.api_txn({ compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 1, result: 'LESS' } ], success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ], failure: [ { request_range: { key: '/vitastor/config/global' } } ], @@ -47,14 +47,14 @@ tests['read/write'] = async () => } } ] } ); expect( - t.api_txn({ + await t.api_txn({ compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 2, result: 'LESS' } ], success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world2' } } } ] }), { header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] } ); expect( - t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }), + await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }), { header: { revision: 2 }, succeeded: true, responses: [ { response_range: { kvs: [ { key: '/vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ], } } ] } @@ -63,6 +63,7 @@ tests['read/write'] = async () => t.dump(false), {"state":{"children":{"":{"children":{"vitastor":{"children":{"config":{"children":{"global":{"version":2,"mod_revision":2,"create_revision":1,"value":{"hello":"world2"}}}}}}}}}},"mod_revision":2,"leases":{}} ); + t.destroy(); }; tests['watch'] = async () => @@ -71,7 +72,7 @@ tests['watch'] = async () => const sent = []; const send = (event) => sent.push(event); expect( - t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }), + await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }), { header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] } ); expect( @@ -80,10 +81,11 @@ tests['watch'] = async () => ); expect(sent, []); expect( - t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }), + await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }), { header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] } ); expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'PUT', kv: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' }, mod_revision: 2 } } ] } } ]); + t.destroy(); }; tests['lease'] = async () => @@ -91,10 +93,10 @@ tests['lease'] = async () => const t = new EtcTree(); const sent = []; const send = (event) => sent.push(event); - const leaseID = t.api_grant_lease({ TTL: 0.5 }).ID; + const leaseID = (await t.api_grant_lease({ TTL: 0.5 })).ID; expect(leaseID != null, true); expect( - t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }), + await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }), { header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] } ); expect( @@ -114,6 +116,57 @@ tests['lease'] = async () => t2.pause_leases(); t2.load(dump); expect(t2.dump(false), dump); + t.destroy(); + t2.destroy(); +}; + +tests['update'] = async () => +{ + const t1 = new EtcTree(); + const t2 = new EtcTree(); + const leaseID = (await t1.api_grant_lease({ TTL: 0.5 })).ID; + expect(leaseID != null, true); + expect( + await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }), + { header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] } + ); + expect( + await t2.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.6' } } } ] }), + { header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] } + ); + expect( + await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.5' } } } ] }), + { header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] } + ); + let dump2 = t2.dump(); + t2.load(t1.dump(), true); + t1.load(dump2, true); + let dump = t2.dump(false); + let expires = dump.leases[leaseID].expires; + expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}}); + expect(t1.dump(false), {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}}); + t1.destroy(); + t2.destroy(); +}; + +tests['replicate watcher'] = async () => +{ + const t = new EtcTree(); + t.set_replicate_watcher(async () => + { + throw new Error('replication failed'); + }); + let thrown = false; + try + { + await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }); + } + catch (e) + { + thrown = e; + } + expect(thrown && thrown.message == 'replication failed', true); + t.destroy(); }; (async function()