Implement special replication listener for etctree

master
Vitaliy Filippov 2024-05-07 15:32:01 +03:00
parent 2692f4abc9
commit e8b600f536
2 changed files with 161 additions and 17 deletions

View File

@ -22,7 +22,27 @@ class EtcTree
this.watcher_id = 0;
this.mod_revision = 0;
this.use_base64 = use_base64;
this.replicate = null;
this.paused = false;
this.active_immediate = [];
}
destroy()
{
this.pause_leases();
for (const imm of this.active_immediate)
{
clearImmediate(imm);
}
}
set_replicate_watcher(replicate)
{
// Replication watcher is special:
// It should be an async function and it is called BEFORE notifying all
// other watchers about any change.
// It may also throw to prevent notifying at all if replication fails.
this.replicate = replicate;
}
de64(k)
@ -326,12 +346,12 @@ class EtcTree
const lease = this.leases[id];
if (!lease.timer_id)
{
lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), lease.expires - Date.now());
lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }).catch(console.error), lease.expires - Date.now());
}
}
}
api_grant_lease(req)
async api_grant_lease(req)
{
let id;
while (!id || this.leases[id])
@ -341,10 +361,14 @@ class EtcTree
const expires = Date.now() + req.TTL*1000;
this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} };
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl: req.TTL, expires } ] });
}
return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL };
}
api_keepalive_lease(req)
async api_keepalive_lease(req)
{
const id = req.ID;
if (!this.leases[id])
@ -360,6 +384,10 @@ class EtcTree
const ttl = this.leases[id].ttl;
lease.expires = Date.now() + ttl*1000;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl, expires: lease.expires } ] });
}
// extra wrapping in { result: ... }
return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } };
}
@ -404,18 +432,74 @@ class EtcTree
delete this.leases[id];
}
api_revoke_lease(req)
async api_revoke_lease(req, no_throw)
{
const notifications = [];
if (!this.leases[req.ID])
{
if (no_throw)
return null;
throw new Error('unknown lease');
}
this._sync_revoke_lease(req.ID, notifications);
if (this.replicate)
{
await this.notify_replicator(notifications, [ { id: req.ID } ]);
}
this._notify(notifications);
return { header: { revision: this.mod_revision } };
}
async notify_replicator(notifications, leases)
{
// First replicate the change and then notify watchers about it
const all_changes = {};
for (const chg of notifications)
{
all_changes[chg.key] = { ...chg };
delete all_changes[chg.key].watchers;
}
await this.replicate({ header: { revision: this.mod_revision }, events: Object.values(all_changes), leases });
}
async apply_replication(msg)
{
const notifications = [];
if ((msg.leases||[]).length)
{
for (const lease of msg.leases)
{
if (lease.ttl)
{
this.load_lease(lease);
}
else
{
this._sync_revoke_lease(lease.id, notifications);
}
}
}
if ((msg.events||[]).length)
{
for (const ev of msg.events)
{
if (ev.value == null)
{
this._delete_range({ key: ev.key }, ev.mod_revision, notifications);
}
else
{
this._put({ key: ev.key, value: ev.value }, ev.mod_revision, notifications);
}
}
}
if (this.replicate)
{
await this.notify_replicator(notifications, msg.leases);
}
this._notify(notifications);
}
api_create_watch(req, send)
{
const { parts, all } = this._get_range(req);
@ -455,13 +539,15 @@ class EtcTree
if (req.start_revision && req.start_revision < this.mod_revision)
{
// Send initial changes
setImmediate(() =>
const imm = setImmediate(() =>
{
this.active_immediate = this.active_immediate.filter(i => i !== imm);
const events = [];
const { cur } = this._get_subtree([], false, false);
this._get_modified(events, cur, null, req.start_revision);
send({ result: { header: { revision: this.mod_revision }, events } });
});
this.active_immediate.push(imm);
}
return { watch_id, created: true };
}
@ -550,7 +636,7 @@ class EtcTree
}
}
api_txn({ compare, success, failure })
async api_txn({ compare, success, failure })
{
const failed = (compare || []).filter(chk => !this._check(chk)).length > 0;
const responses = [];
@ -560,6 +646,11 @@ class EtcTree
{
responses.push(this._txn_action(req, next_revision, notifications));
}
if (this.replicate && notifications.length)
{
// First replicate the change and then notify watchers about it
await this.notify_replicator(notifications);
}
this._notify(notifications);
return { header: { revision: this.mod_revision }, succeeded: !failed, responses };
}

View File

@ -17,27 +17,27 @@ tests['read/write'] = async () =>
{
const t = new EtcTree();
expect(
t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] }),
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] }),
await t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: { kvs: [] } } ] }
);
expect(
t.api_txn({
await t.api_txn({
compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 1, result: 'LESS' } ],
success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ],
failure: [ { request_range: { key: '/vitastor/config/global' } } ],
@ -47,14 +47,14 @@ tests['read/write'] = async () =>
} } ] }
);
expect(
t.api_txn({
await t.api_txn({
compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 2, result: 'LESS' } ],
success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world2' } } } ]
}),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ],
} } ] }
@ -63,6 +63,7 @@ tests['read/write'] = async () =>
t.dump(false),
{"state":{"children":{"":{"children":{"vitastor":{"children":{"config":{"children":{"global":{"version":2,"mod_revision":2,"create_revision":1,"value":{"hello":"world2"}}}}}}}}}},"mod_revision":2,"leases":{}}
);
t.destroy();
};
tests['watch'] = async () =>
@ -71,7 +72,7 @@ tests['watch'] = async () =>
const sent = [];
const send = (event) => sent.push(event);
expect(
t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
@ -80,10 +81,11 @@ tests['watch'] = async () =>
);
expect(sent, []);
expect(
t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }),
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'PUT', kv: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' }, mod_revision: 2 } } ] } } ]);
t.destroy();
};
tests['lease'] = async () =>
@ -91,10 +93,10 @@ tests['lease'] = async () =>
const t = new EtcTree();
const sent = [];
const send = (event) => sent.push(event);
const leaseID = t.api_grant_lease({ TTL: 0.5 }).ID;
const leaseID = (await t.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
@ -114,6 +116,57 @@ tests['lease'] = async () =>
t2.pause_leases();
t2.load(dump);
expect(t2.dump(false), dump);
t.destroy();
t2.destroy();
};
tests['update'] = async () =>
{
const t1 = new EtcTree();
const t2 = new EtcTree();
const leaseID = (await t1.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t2.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.6' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.5' } } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
let dump2 = t2.dump();
t2.load(t1.dump(), true);
t1.load(dump2, true);
let dump = t2.dump(false);
let expires = dump.leases[leaseID].expires;
expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
expect(t1.dump(false), {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
t1.destroy();
t2.destroy();
};
tests['replicate watcher'] = async () =>
{
const t = new EtcTree();
t.set_replicate_watcher(async () =>
{
throw new Error('replication failed');
});
let thrown = false;
try
{
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] });
}
catch (e)
{
thrown = e;
}
expect(thrown && thrown.message == 'replication failed', true);
t.destroy();
};
(async function()