Compare commits

...

2 Commits

4 changed files with 36 additions and 8 deletions

View File

@ -226,7 +226,10 @@ class AntiCluster
} }
else else
{ {
this._log('Got dump from '+client.raft_node_id+' with stored term '+res.term); this._log(
'Got dump from '+client.raft_node_id+' with stored term '+res.term+
', mod_revision '+res.mod_revision+', compact_revision '+res.compact_revision
);
} }
this.resync_state.dumps[client.raft_node_id] = res.error ? null : res; this.resync_state.dumps[client.raft_node_id] = res.error ? null : res;
this._continueResync(); this._continueResync();
@ -325,7 +328,10 @@ class AntiCluster
this.antietcd.stored_term = this.raft.term; this.antietcd.stored_term = this.raft.term;
this.synced = true; this.synced = true;
runCallbacks(this, 'wait_sync', []); runCallbacks(this, 'wait_sync', []);
this._log('Synchronized with followers, new term is '+this.raft.term); this._log(
'Synchronized with followers, new term is '+this.raft.term+
', mod_revision '+this.antietcd.etctree.mod_revision+', compact_revision '+this.antietcd.etctree.compact_revision
);
} }
_isWrite(path, data) _isWrite(path, data)
@ -452,7 +458,10 @@ class AntiCluster
this.antietcd.stored_term = msg.term; this.antietcd.stored_term = msg.term;
this.synced = true; this.synced = true;
runCallbacks(this, 'wait_sync', []); runCallbacks(this, 'wait_sync', []);
this._log('Synchronized with leader, new term is '+msg.term); this._log(
'Synchronized with leader, new term is '+this.raft.term+
', mod_revision '+this.antietcd.etctree.mod_revision+', compact_revision '+this.antietcd.etctree.compact_revision
);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} })); client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
} }
else else

View File

@ -116,7 +116,7 @@ class AntiPersistence
this.wait_persist = []; this.wait_persist = [];
try try
{ {
let dump = this.antietcd.etctree.dump(true); let dump = this.antietcd.etctree.dump(true, this.cfg.persist_filter);
dump['term'] = this.antietcd.stored_term; dump['term'] = this.antietcd.stored_term;
dump = stableStringify(dump); dump = stableStringify(dump);
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res))); dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));

View File

@ -168,7 +168,7 @@ class EtcTree
dump(persistent_only, value_filter) dump(persistent_only, value_filter)
{ {
const snapshot = { const snapshot = {
state: this._copy_tree(this.state, persistent_only, value_filter) || {}, state: this._copy_tree(this.state, null, persistent_only, value_filter) || {},
mod_revision: this.mod_revision, mod_revision: this.mod_revision,
compact_revision: this.compact_revision, compact_revision: this.compact_revision,
}; };
@ -184,13 +184,13 @@ class EtcTree
return snapshot; return snapshot;
} }
_copy_tree(cur, no_lease, value_filter) _copy_tree(cur, key, no_lease, value_filter)
{ {
let nonempty = cur.value != null && (!no_lease || !cur.lease); let nonempty = cur.value != null && (!no_lease || !cur.lease);
let filtered; let filtered;
if (nonempty && value_filter) if (nonempty && value_filter)
{ {
filtered = value_filter(cur.value); filtered = value_filter(key === null ? '' : key, cur.value);
nonempty = nonempty && filtered != null; nonempty = nonempty && filtered != null;
} }
const copy = (nonempty ? { ...cur } : {}); const copy = (nonempty ? { ...cur } : {});
@ -204,7 +204,7 @@ class EtcTree
let has_children = false; let has_children = false;
for (const k in cur.children) for (const k in cur.children)
{ {
const child = this._copy_tree(cur.children[k], no_lease, value_filter); const child = this._copy_tree(cur.children[k], key === null ? k : key+'/'+k, no_lease, value_filter);
if (child) if (child)
{ {
copy.children[k] = child; copy.children[k] = child;

View File

@ -184,6 +184,25 @@ tests['update'] = async () =>
t2.destroy(); t2.destroy();
}; };
tests['dump filter'] = async () =>
{
const t1 = new EtcTree();
const leaseID = (await t1.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
await t1.api_txn({ success: [
{ request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.6' } } },
{ request_put: { key: '/vitastor/osd/stats/1', value: { time: 123, read: [ 10, 1000, 10459 ] } } },
] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} }, { response_put: {} } ] }
);
expect(
t1.dump(true, (key, value) => (key.substr(0, '/vitastor/osd/stats'.length) === '/vitastor/osd/stats' ? { time: value.time } : value)),
{"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"stats":{"children":{"1":{"version":1,"mod_revision":2,"create_revision":2,"value":{"time":123}}}}}}}}}}}},"mod_revision":2,"compact_revision":0}
);
t1.destroy();
};
tests['replicate watcher'] = async () => tests['replicate watcher'] = async () =>
{ {
const t = new EtcTree(); const t = new EtcTree();