Implement deletion compaction

master
Vitaliy Filippov 2024-05-08 13:51:26 +03:00
parent 325c2bb2d9
commit 1734b422cb
3 changed files with 80 additions and 2 deletions

View File

@ -81,6 +81,23 @@ class AntiCluster
{
return;
}
const mod_revision = this.antietcd.etctree.mod_revision;
await this._requestFollowers({ replicate: msg }, this.cfg.replication_timeout||1000);
// We have a guarantee that all revisions before mod_revision are applied by followers,
// because replication messages are either processed synchronously or serialized in
// AntiPersistence against <wait_persist>
this.sync_revision = mod_revision;
if (this.sync_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{
const revision = this.sync_revision - (this.cfg.compact_revisions||1000);
await this._requestFollowers({ compact: { revision } }, this.cfg.compact_timeout||1000);
this.antietcd.etctree.compact(revision);
}
}
async _requestFollowers(msg, timeout)
{
msg.term = this.raft.term;
for (const follower of this.raft.followers)
{
if (follower != this.cfg.node_id)
@ -101,7 +118,7 @@ class AntiCluster
if (follower != this.cfg.node_id)
{
const client = this._getPeer(follower);
const promise = this._peerRequest(client, { replicate: msg, term: this.raft.term }, this.cfg.replication_timeout||1000);
const promise = this._peerRequest(client, msg, timeout);
promises.push(promise);
}
}
@ -369,6 +386,10 @@ class AntiCluster
{
this._completeRequest(client.id, msg.request_id, msg.reply);
}
else if (msg.compact)
{
this._handleCompactMsg(client, msg);
}
}
async _handleRequestMsg(client, msg)
@ -429,6 +450,21 @@ class AntiCluster
}
}
_handleCompactMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
this.antietcd.etctree.compact(msg.compact.revision);
console.log('Compacted deletions up to '+msg.compact.revision);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
_getPeer(to)
{
if (to == this.cfg.node_id)

View File

@ -502,6 +502,10 @@ Clustering:
Timeout for forwarding requests from follower to leader in milliseconds
--replication_timeout 1000
Timeout for replicating requests from leader to follower in milliseconds
--compact_revisions 1000
Number of previous revisions to keep deletion information in memory
--compact_timeout 1000
Timeout for compaction requests from leader to follower in milliseconds
`;
function parse()

View File

@ -21,6 +21,7 @@ class EtcTree
this.watchers = {};
this.watcher_id = 0;
this.mod_revision = 0;
this.compact_revision = 0;
this.use_base64 = use_base64;
this.replicate = null;
this.paused = false;
@ -159,6 +160,7 @@ class EtcTree
const snapshot = {
state: this._copy_tree(this.state, persistent_only, value_filter) || {},
mod_revision: this.mod_revision,
compact_revision: this.compact_revision,
};
if (!persistent_only)
{
@ -217,6 +219,10 @@ class EtcTree
{
this.mod_revision = snapshot.mod_revision;
}
if (!update_only || this.compact_revision > (snapshot.compact_revision||0))
{
this.compact_revision = snapshot.compact_revision||0;
}
// First apply leases
const notifications = [];
if (!update_only && snapshot.leases)
@ -500,6 +506,26 @@ class EtcTree
this._notify(notifications);
}
// forget deletions before compact_revision
compact(compact_revision)
{
this._compact(compact_revision, this.state);
this.compact_revision = compact_revision;
}
_compact(compact_revision, cur)
{
for (const key in cur.children||{})
{
const child = cur.children[key];
this._compact(compact_revision, child);
if (emptyObj(child.children) && child.value == null && child.mod_revision < compact_revision)
{
delete cur.children[key];
}
}
}
api_create_watch(req, send)
{
const { parts, all } = this._get_range(req);
@ -600,7 +626,6 @@ class EtcTree
if (!cur.key_watchers.length)
cur.key_watchers = null;
}
// FIXME: cleanup deleted tree paths
}
}
delete this.watchers[watch_id];
@ -819,6 +844,19 @@ function eq(a, b)
return a == b;
}
function emptyObj(obj)
{
if (!obj)
{
return true;
}
for (const k in obj)
{
return false;
}
return true;
}
EtcTree.eq = eq;
module.exports = EtcTree;