Implement deletion compaction
parent
325c2bb2d9
commit
73f0e809e8
|
@ -81,6 +81,23 @@ class AntiCluster
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
const mod_revision = this.antietcd.etctree.mod_revision;
|
||||||
|
await this._requestFollowers({ replicate: msg }, this.cfg.replication_timeout||1000);
|
||||||
|
// We have a guarantee that all revisions before mod_revision are applied by followers,
|
||||||
|
// because replication messages are either processed synchronously or serialized in
|
||||||
|
// AntiPersistence against <wait_persist>
|
||||||
|
this.sync_revision = mod_revision;
|
||||||
|
if (this.sync_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
|
||||||
|
{
|
||||||
|
const revision = this.sync_revision - (this.cfg.compact_revisions||1000);
|
||||||
|
await this._requestFollowers({ compact: { revision } }, this.cfg.compact_timeout||1000);
|
||||||
|
this.antietcd.etctree.compact(revision);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _requestFollowers(msg, timeout)
|
||||||
|
{
|
||||||
|
msg.term = this.raft.term;
|
||||||
for (const follower of this.raft.followers)
|
for (const follower of this.raft.followers)
|
||||||
{
|
{
|
||||||
if (follower != this.cfg.node_id)
|
if (follower != this.cfg.node_id)
|
||||||
|
@ -101,7 +118,7 @@ class AntiCluster
|
||||||
if (follower != this.cfg.node_id)
|
if (follower != this.cfg.node_id)
|
||||||
{
|
{
|
||||||
const client = this._getPeer(follower);
|
const client = this._getPeer(follower);
|
||||||
const promise = this._peerRequest(client, { replicate: msg, term: this.raft.term }, this.cfg.replication_timeout||1000);
|
const promise = this._peerRequest(client, msg, timeout);
|
||||||
promises.push(promise);
|
promises.push(promise);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -369,6 +386,10 @@ class AntiCluster
|
||||||
{
|
{
|
||||||
this._completeRequest(client.id, msg.request_id, msg.reply);
|
this._completeRequest(client.id, msg.request_id, msg.reply);
|
||||||
}
|
}
|
||||||
|
else if (msg.compact)
|
||||||
|
{
|
||||||
|
this._handleCompactMsg(client, msg);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _handleRequestMsg(client, msg)
|
async _handleRequestMsg(client, msg)
|
||||||
|
@ -429,6 +450,21 @@ class AntiCluster
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_handleCompactMsg(client, msg)
|
||||||
|
{
|
||||||
|
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
|
||||||
|
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
|
||||||
|
{
|
||||||
|
this.antietcd.etctree.compact(msg.compact.revision);
|
||||||
|
console.log('Compacted deletions up to '+msg.compact.revision);
|
||||||
|
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
_getPeer(to)
|
_getPeer(to)
|
||||||
{
|
{
|
||||||
if (to == this.cfg.node_id)
|
if (to == this.cfg.node_id)
|
||||||
|
|
|
@ -502,6 +502,10 @@ Clustering:
|
||||||
Timeout for forwarding requests from follower to leader in milliseconds
|
Timeout for forwarding requests from follower to leader in milliseconds
|
||||||
--replication_timeout 1000
|
--replication_timeout 1000
|
||||||
Timeout for replicating requests from leader to follower in milliseconds
|
Timeout for replicating requests from leader to follower in milliseconds
|
||||||
|
--compact_revisions 1000
|
||||||
|
Number of previous revisions to keep deletion information in memory
|
||||||
|
--compact_timeout 1000
|
||||||
|
Timeout for compaction requests from leader to follower in milliseconds
|
||||||
`;
|
`;
|
||||||
|
|
||||||
function parse()
|
function parse()
|
||||||
|
|
39
etctree.js
39
etctree.js
|
@ -21,6 +21,7 @@ class EtcTree
|
||||||
this.watchers = {};
|
this.watchers = {};
|
||||||
this.watcher_id = 0;
|
this.watcher_id = 0;
|
||||||
this.mod_revision = 0;
|
this.mod_revision = 0;
|
||||||
|
this.compact_revision = 0;
|
||||||
this.use_base64 = use_base64;
|
this.use_base64 = use_base64;
|
||||||
this.replicate = null;
|
this.replicate = null;
|
||||||
this.paused = false;
|
this.paused = false;
|
||||||
|
@ -159,6 +160,7 @@ class EtcTree
|
||||||
const snapshot = {
|
const snapshot = {
|
||||||
state: this._copy_tree(this.state, persistent_only, value_filter) || {},
|
state: this._copy_tree(this.state, persistent_only, value_filter) || {},
|
||||||
mod_revision: this.mod_revision,
|
mod_revision: this.mod_revision,
|
||||||
|
compact_revision: this.compact_revision,
|
||||||
};
|
};
|
||||||
if (!persistent_only)
|
if (!persistent_only)
|
||||||
{
|
{
|
||||||
|
@ -217,6 +219,10 @@ class EtcTree
|
||||||
{
|
{
|
||||||
this.mod_revision = snapshot.mod_revision;
|
this.mod_revision = snapshot.mod_revision;
|
||||||
}
|
}
|
||||||
|
if (!update_only || this.compact_revision > (snapshot.compact_revision||0))
|
||||||
|
{
|
||||||
|
this.compact_revision = snapshot.compact_revision||0;
|
||||||
|
}
|
||||||
// First apply leases
|
// First apply leases
|
||||||
const notifications = [];
|
const notifications = [];
|
||||||
if (!update_only && snapshot.leases)
|
if (!update_only && snapshot.leases)
|
||||||
|
@ -500,6 +506,26 @@ class EtcTree
|
||||||
this._notify(notifications);
|
this._notify(notifications);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// forget deletions before compact_revision
|
||||||
|
compact(compact_revision)
|
||||||
|
{
|
||||||
|
this._compact(compact_revision, this.state);
|
||||||
|
this.compact_revision = compact_revision;
|
||||||
|
}
|
||||||
|
|
||||||
|
_compact(compact_revision, cur)
|
||||||
|
{
|
||||||
|
for (const key in cur.children||{})
|
||||||
|
{
|
||||||
|
const child = cur.children[key];
|
||||||
|
this._compact(compact_revision, child);
|
||||||
|
if (emptyObj(child.children) && child.value == null && child.mod_revision < compact_revision)
|
||||||
|
{
|
||||||
|
delete cur.children[key];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
api_create_watch(req, send)
|
api_create_watch(req, send)
|
||||||
{
|
{
|
||||||
const { parts, all } = this._get_range(req);
|
const { parts, all } = this._get_range(req);
|
||||||
|
@ -819,6 +845,19 @@ function eq(a, b)
|
||||||
return a == b;
|
return a == b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function emptyObj(obj)
|
||||||
|
{
|
||||||
|
if (!obj)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
for (const k in obj)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
EtcTree.eq = eq;
|
EtcTree.eq = eq;
|
||||||
|
|
||||||
module.exports = EtcTree;
|
module.exports = EtcTree;
|
||||||
|
|
Loading…
Reference in New Issue