Implement simple dump/load and lease pausing

de64
Vitaliy Filippov 2024-05-01 01:56:38 +03:00
parent 0947d0d61a
commit b559f9b555
2 changed files with 264 additions and 21 deletions

View File

@ -22,6 +22,8 @@ class EtcTree
this.watcher_id = 0;
this.mod_revision = 0;
this.use_base64 = use_base64;
this.paused = false;
this.on_expire_lease = null;
}
de64(k)
@ -129,13 +131,197 @@ class EtcTree
{
watchers.push.apply(watchers, cur.watchers);
}
if (notify && cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
return { watchers, cur };
}
// create a snapshot of all data including leases
dump(persistent_only)
{
const snapshot = {
state: this._copy_tree(this.state, persistent_only) || {},
mod_revision: this.mod_revision,
};
if (!persistent_only)
{
snapshot.leases = {};
for (const id in this.leases)
{
const lease = this.leases[id];
snapshot.leases[id] = { ttl: lease.ttl, expires: lease.expires };
}
}
return snapshot;
}
_copy_tree(cur, no_lease)
{
const nonempty = cur.value != null && (!no_lease || !copy.lease);
const copy = (nonempty ? { ...cur } : {});
copy.children = {};
delete copy.watchers;
delete copy.key_watchers;
let has_children = false;
for (const k in cur.children)
{
const child = this._copy_tree(cur.children[k]);
if (child)
{
copy.children[k] = child;
has_children = true;
}
}
if (!nonempty && !has_children)
{
return null;
}
if (!has_children)
{
delete copy.children;
}
return copy;
}
// load snapshot of all data including leases
load(snapshot)
{
this.mod_revision = snapshot.mod_revision;
// First apply leases
for (const id in this.leases)
{
if (!snapshot.leases[id])
{
this.api_revoke_lease(id);
}
}
for (const id in snapshot.leases)
{
if (!this.leases[id])
{
this.leases[id] = { ...snapshot.leases[id], timer_id: null, keys: {} };
}
else if (this.leases[id].ttl != snapshot.leases[id].ttl ||
this.leases[id].expires != snapshot.leases[id].expires)
{
this.leases[id].ttl = snapshot.leases[id].ttl;
this.leases[id].expires = snapshot.leases[id].expires;
}
else
{
continue;
}
if (this.leases[id].timer_id)
{
clearTimeout(this.leases[id].timer_id);
this.leases[id].timer_id = null;
}
if (!this.paused)
{
this.leases[id].timer_id = setTimeout(() => this.expire_lease(id), this.leases[id].expires - Date.now());
}
}
// Then find and apply the difference in data
const notifications = [];
this._restore_diff(this.state, snapshot.state, null, this.state.watchers || [], notifications);
this.notify(notifications);
}
_restore_diff(cur_old, cur_new, prefix, watchers, notifications)
{
const key = prefix === null ? '' : prefix;
if (!eq(cur_old.lease, cur_new.lease))
{
if (cur_old.lease && this.leases[cur_old.lease])
{
delete this.leases[cur_old.lease].keys[key];
}
cur_old.lease = cur_new.lease;
if (cur_new.lease && this.leases[cur_new.lease])
{
this.leases[cur_new.lease].keys[key] = true;
}
}
cur_old.mod_revision = cur_new.mod_revision;
cur_old.create_revision = cur_new.create_revision;
cur_old.version = cur_new.version;
if (!eq(cur_old.value, cur_new.value))
{
cur_old.value = cur_new.value;
for (const w of (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers))
{
const notify = { watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision };
if (cur_old.lease)
{
notify.lease = cur_old.lease;
}
notifications.push(notify);
}
}
cur_old.children ||= {};
for (const k in cur_new.children)
{
if (!cur_old.children[k])
{
cur_old.children[k] = cur_new.children[k];
}
else
{
this._restore_diff(
cur_old.children[k], cur_new.children[k],
prefix === null ? k : prefix+'/'+k,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
notifications
);
}
}
for (const k in cur_old.children)
{
if (!cur_new.children[k])
{
// Delete subtree
this.delete_all(
notifications,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
cur_old.children[k], true,
prefix === null ? k : prefix+'/'+k,
this.mod_revision
);
}
}
}
// slave/follower nodes don't expire leases themselves, they listen for the leader instead
pause_leases()
{
this.paused = true;
for (const id in this.leases)
{
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
}
}
resume_leases()
{
this.paused = false;
for (const id in this.leases)
{
const lease = this.leases[id];
if (!lease.timer_id)
{
lease.timer_id = setTimeout(() => this.expire_lease(id), lease.expires - Date.now());
}
}
}
set_on_expire_lease(cb)
{
this.on_expire_lease = cb;
}
api_grant_lease(req)
{
let id;
@ -143,8 +329,9 @@ class EtcTree
{
id = crypto.randomBytes(8).toString('hex');
}
const timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), req.TTL*1000);
this.leases[id] = { ttl: req.TTL, timer_id, keys: {} };
const expires = Date.now() + req.TTL*1000;
const timer_id = this.paused ? null : setTimeout(() => this.expire_lease(id), req.TTL*1000);
this.leases[id] = { ttl: req.TTL, expires, timer_id, keys: {} };
return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL };
}
@ -155,13 +342,31 @@ class EtcTree
{
throw new Error('unknown lease');
}
clearTimeout(this.leases[id].timer_id);
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
const ttl = this.leases[id].ttl;
this.leases[id].timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }), ttl*1000);
lease.expires = Date.now() + ttl*1000;
if (!this.paused)
{
lease.timer_id = setTimeout(() => this.expire_lease(id), ttl*1000);
}
// extra wrapping in { result: ... }
return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } };
}
expire_lease(id)
{
this.api_revoke_lease({ ID: id })
if (this.on_expire_lease)
{
this.on_expire_lease(id);
}
}
api_revoke_lease(req)
{
if (!this.leases[req.ID])
@ -344,6 +549,10 @@ class EtcTree
const key = parts.join('/');
const value = this.de64(request_put.value);
const { cur, watchers } = this.get_subtree(parts, true, true);
if (cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
if (!eq(cur.value, value) || cur.lease != request_put.lease)
{
if (cur.lease && this.leases[cur.lease])
@ -359,6 +568,10 @@ class EtcTree
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;
}
else if (cur.lease)
{
cur.lease = null;
}
this.mod_revision = cur_revision;
cur.version = (cur.version||0) + 1;
cur.mod_revision = cur_revision;
@ -400,14 +613,19 @@ class EtcTree
}
if (cur.value != null)
{
if (req.keys_only)
const item = { key: this.b64(prefix === null ? '' : prefix) };
if (!req.keys_only)
{
kvs.push({ key: this.b64(prefix === null ? '' : prefix), mod_revision: cur.mod_revision });
}
else
{
kvs.push({ key: this.b64(prefix === null ? '' : prefix), value: this.b64(cur.value), mod_revision: cur.mod_revision });
item.value = this.b64(cur.value);
item.mod_revision = cur.mod_revision;
//item.create_revision = cur.create_revision;
//item.version = cur.version;
if (cur.lease)
{
item.lease = cur.lease;
}
}
kvs.push(item);
}
if (all && cur.children)
{
@ -424,12 +642,20 @@ class EtcTree
{
// Do not actually forget the key until the deletion is confirmed by all replicas
// ...and until it's not required by watchers
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[prefix === null ? '' : prefix];
}
cur.value = null;
cur.version = 0;
cur.create_revision = null;
cur.mod_revision = cur_revision;
this.mod_revision = cur_revision;
notifications.push({ watchers, key: this.b64(prefix === null ? '' : prefix), mod_revision: cur_revision });
notifications.push({
watchers: cur.key_watchers ? [ ...watchers, ...cur.key_watchers ] : watchers,
key: this.b64(prefix === null ? '' : prefix),
mod_revision: cur_revision,
});
}
if (all && cur.children)
{

View File

@ -13,7 +13,7 @@ const expect = (a, b) =>
}
};
tests['read/write'] = () =>
tests['read/write'] = async () =>
{
const t = new EtcTree();
expect(
@ -59,9 +59,13 @@ tests['read/write'] = () =>
kvs: [ { key: '/vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ],
} } ] }
);
expect(
t.dump(false),
{"state":{"children":{"":{"children":{"vitastor":{"children":{"config":{"children":{"global":{"version":2,"mod_revision":2,"create_revision":1,"value":{"hello":"world2"}}}}}}}}}},"mod_revision":2,"leases":{}}
);
};
tests['watch'] = () =>
tests['watch'] = async () =>
{
const t = new EtcTree();
const sent = [];
@ -98,12 +102,25 @@ tests['lease'] = async () =>
{ watch_id: 1, created: true }
);
expect(sent, []);
const dump = t.dump(false);
const expires = dump.leases[leaseID].expires;
expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":1,"mod_revision":1,"create_revision":1,"value":{"ip":"1.2.3.4"}}}}}}}}}}}},"mod_revision":1,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
await new Promise(ok => setTimeout(ok, 600));
expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'DELETE', kv: { key: '/vitastor/osd/state/1', mod_revision: 2 } } ] } } ]);
t.pause_leases();
t.load(dump);
expect(t.dump(false), dump);
const t2 = new EtcTree();
t2.pause_leases();
t2.load(dump);
expect(t2.dump(false), dump);
};
for (cur_test in tests)
(async function()
{
tests[cur_test]();
console.log(cur_test+' test: OK');
}
for (cur_test in tests)
{
await tests[cur_test]();
console.log(cur_test+' test: OK');
}
})().catch(console.error);