Compare commits

...

7 Commits

5 changed files with 74 additions and 34 deletions

View File

@ -83,6 +83,7 @@ class AntiEtcdCli
{ {
await this.del(cmd.slice(1)); await this.del(cmd.slice(1));
} }
process.exit(0);
} }
async get(keys) async get(keys)
@ -93,7 +94,7 @@ class AntiEtcdCli
} }
const txn = { success: keys.map(key => ({ request_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) }; const txn = { success: keys.map(key => ({ request_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) };
const res = await this.request('/v3/kv/txn', txn); const res = await this.request('/v3/kv/txn', txn);
for (const r of (res||{}).responses||[]) for (const r of res.responses||[])
{ {
if (r.response_range) if (r.response_range)
{ {
@ -119,7 +120,7 @@ class AntiEtcdCli
value = await fsp.readFile(0, { encoding: 'utf-8' }); value = await fsp.readFile(0, { encoding: 'utf-8' });
} }
const res = await this.request('/v3/kv/put', { key: b64(key), value: b64(value) }); const res = await this.request('/v3/kv/put', { key: b64(key), value: b64(value) });
if (res && res.header) if (res.header)
{ {
console.log('OK'); console.log('OK');
} }
@ -133,7 +134,7 @@ class AntiEtcdCli
} }
const txn = { success: keys.map(key => ({ request_delete_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) }; const txn = { success: keys.map(key => ({ request_delete_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) };
const res = await this.request('/v3/kv/txn', txn); const res = await this.request('/v3/kv/txn', txn);
for (const r of (res||{}).responses||[]) for (const r of res.responses||[])
{ {
if (r.response_delete_range) if (r.response_delete_range)
{ {
@ -147,16 +148,33 @@ class AntiEtcdCli
for (const url of this.options.endpoints) for (const url of this.options.endpoints)
{ {
const cur_url = url.replace(/\/+$/, '')+path; const cur_url = url.replace(/\/+$/, '')+path;
try const res = await POST(cur_url, body, this.options.timeout||1000);
let ok = false;
if (res.json)
{ {
return (await POST(cur_url, body, this.options.timeout||1000)).json; if (res.json.error)
{
console.error(cur_url+': '+res.json.error);
process.exit(1);
}
return res.json;
} }
catch (e) if (res.body)
{ {
console.error(cur_url+': '+e.message); console.error(cur_url+': '+res.body);
} }
if (res.error)
{
console.error(cur_url+': '+res.error);
if (!res.response || !res.response.statusCode)
{
// This URL is unavailable
continue;
}
}
break;
} }
return null; process.exit(1);
} }
} }
@ -188,9 +206,9 @@ function POST(url, body, timeout)
res.on('data', chunk => { res_body += chunk; }); res.on('data', chunk => { res_body += chunk; });
res.on('end', () => res.on('end', () =>
{ {
if (res.statusCode != 200) if (res.statusCode != 200 || !/application\/json/i.exec(res.headers['content-type']))
{ {
ok({ error: res_body, code: res.statusCode }); ok({ response: res, body: res_body, code: res.statusCode });
return; return;
} }
try try
@ -200,7 +218,7 @@ function POST(url, body, timeout)
} }
catch (e) catch (e)
{ {
ok({ error: e, response: res, body: res_body }); ok({ response: res, error: e, body: res_body });
} }
}); });
}); });

View File

@ -59,6 +59,7 @@ class AntiCluster
this.antietcd.clients[client_id].raft_node_id = node_id; this.antietcd.clients[client_id].raft_node_id = node_id;
this.antietcd.clients[client_id].addr = socket._socket.remoteAddress+':'+socket._socket.remotePort; this.antietcd.clients[client_id].addr = socket._socket.remoteAddress+':'+socket._socket.remotePort;
socket.send(JSON.stringify({ identify: { key: this.cfg.cluster_key, node_id: this.cfg.node_id } })); socket.send(JSON.stringify({ identify: { key: this.cfg.cluster_key, node_id: this.cfg.node_id } }));
this.raft.start();
} }
}); });
} }

View File

@ -86,13 +86,29 @@ class AntiEtcd
async persistAndReplicate(msg) async persistAndReplicate(msg)
{ {
if (this.persistence) let res = [];
{
await this.persistence.persistChange(msg);
}
if (this.cluster) if (this.cluster)
{ {
await this.cluster.replicateChange(msg); // We have to guarantee that replication is processed sequentially
// So we have to send messages without first awaiting for anything!
res.push(this.cluster.replicateChange(msg));
}
if (this.persistence)
{
res.push(this.persistence.persistChange(msg));
}
if (res.length)
{
res = await Promise.allSettled(res);
const errors = res.filter(r => r.status == 'rejected');
if (errors.length)
{
for (const e of errors)
{
console.error(e.reason);
}
throw errors[0].reason;
}
} }
} }
@ -106,6 +122,7 @@ class AntiEtcd
data = Buffer.concat(data); data = Buffer.concat(data);
let body = ''; let body = '';
let code = 200; let code = 200;
let ctype = 'text/plain; charset=utf-8';
let reply; let reply;
try try
{ {
@ -128,6 +145,7 @@ class AntiEtcd
} }
reply = await this.runHandler(req, data, res); reply = await this.runHandler(req, data, res);
reply = JSON.stringify(reply); reply = JSON.stringify(reply);
ctype = 'application/json';
} }
catch (e) catch (e)
{ {
@ -153,8 +171,8 @@ class AntiEtcd
'\n '+reply.replace(/\n/g, '\\n') '\n '+reply.replace(/\n/g, '\\n')
); );
reply = Buffer.from(reply); reply = Buffer.from(reply);
res.writeHead(200, { res.writeHead(code, {
'Content-Type': 'application/json', 'Content-Type': ctype,
'Content-Length': reply.length, 'Content-Length': reply.length,
}); });
res.write(reply); res.write(reply);

View File

@ -2,6 +2,7 @@ const fs = require('fs');
const fsp = require('fs').promises; const fsp = require('fs').promises;
const zlib = require('zlib'); const zlib = require('zlib');
const stableStringify = require('./stable-stringify.js');
const EtcTree = require('./etctree.js'); const EtcTree = require('./etctree.js');
const { de64, runCallbacks } = require('./common.js'); const { de64, runCallbacks } = require('./common.js');
@ -52,10 +53,10 @@ class AntiPersistence
let changed = false; let changed = false;
for (const ev of msg.events) for (const ev of msg.events)
{ {
if (ev.kv.lease) if (ev.lease)
{ {
// Values with lease are never persisted // Values with lease are never persisted
const key = de64(ev.kv.key); const key = de64(ev.key);
if (this.prev_value[key] !== undefined) if (this.prev_value[key] !== undefined)
{ {
delete this.prev_value[key]; delete this.prev_value[key];
@ -64,15 +65,14 @@ class AntiPersistence
} }
else else
{ {
const key = de64(ev.kv.key); const key = de64(ev.key);
const filtered = this.cfg.persist_filter(key, ev.type === 'DELETE' ? undefined : de64(ev.kv.value)); const filtered = this.cfg.persist_filter(key, ev.value == null ? undefined : de64(ev.value));
if (!EtcTree.eq(filtered, this.prev_value[key])) if (!EtcTree.eq(filtered, this.prev_value[key]))
{ {
this.prev_value[key] = filtered; this.prev_value[key] = filtered;
changed = true; changed = true;
} }
} }
changed = true;
} }
if (!changed) if (!changed)
{ {
@ -114,7 +114,7 @@ class AntiPersistence
{ {
let dump = this.antietcd.etctree.dump(true); let dump = this.antietcd.etctree.dump(true);
dump['term'] = this.antietcd.stored_term; dump['term'] = this.antietcd.stored_term;
dump = JSON.stringify(dump); dump = stableStringify(dump);
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res))); dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));
const fh = await fsp.open(this.cfg.data+'.tmp', 'w'); const fh = await fsp.open(this.cfg.data+'.tmp', 'w');
await fh.writeFile(dump); await fh.writeFile(dump);
@ -124,7 +124,7 @@ class AntiPersistence
} }
catch (e) catch (e)
{ {
console.error(e); console.error('Error persisting data to disk: '+e);
process.exit(1); process.exit(1);
} }
runCallbacks(this, 'wait_persist', null); runCallbacks(this, 'wait_persist', null);

View File

@ -232,7 +232,7 @@ class EtcTree
if (!snapshot.leases[id]) if (!snapshot.leases[id])
{ {
// Revoke without replicating and notifying // Revoke without replicating and notifying
this._sync_revoke_lease(id, notifications); this._sync_revoke_lease(id, notifications, this.mod_revision);
} }
} }
} }
@ -269,10 +269,10 @@ class EtcTree
{ {
cur_old.value = cur_new.value; cur_old.value = cur_new.value;
const key_watchers = (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers); const key_watchers = (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers);
const notify = { watchers: key_watchers, key, value: cur_old.value, mod_revision: cur_old.mod_revision }; const notify = { watchers: key_watchers, key, value: cur_new.value, mod_revision: cur_new.mod_revision };
if (cur_old.lease) if (cur_new.lease)
{ {
notify.lease = cur_old.lease; notify.lease = cur_new.lease;
} }
notifications.push(notify); notifications.push(notify);
} }
@ -366,6 +366,7 @@ class EtcTree
} }
const expires = Date.now() + req.TTL*1000; const expires = Date.now() + req.TTL*1000;
this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} }; this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} };
this.mod_revision++;
this._set_expire(id); this._set_expire(id);
if (this.replicate) if (this.replicate)
{ {
@ -389,6 +390,7 @@ class EtcTree
} }
const ttl = this.leases[id].ttl; const ttl = this.leases[id].ttl;
lease.expires = Date.now() + ttl*1000; lease.expires = Date.now() + ttl*1000;
this.mod_revision++;
this._set_expire(id); this._set_expire(id);
if (this.replicate) if (this.replicate)
{ {
@ -424,13 +426,12 @@ class EtcTree
return true; return true;
} }
_sync_revoke_lease(id, notifications) _sync_revoke_lease(id, notifications, next_revision)
{ {
if (!this.leases[id]) if (!this.leases[id])
{ {
throw new Error('unknown lease'); throw new Error('unknown lease');
} }
const next_revision = this.mod_revision + 1;
for (const key in this.leases[id].keys) for (const key in this.leases[id].keys)
{ {
this._delete_range({ key }, next_revision, notifications); this._delete_range({ key }, next_revision, notifications);
@ -447,7 +448,8 @@ class EtcTree
return null; return null;
throw new Error('unknown lease'); throw new Error('unknown lease');
} }
this._sync_revoke_lease(req.ID, notifications); this.mod_revision++;
this._sync_revoke_lease(req.ID, notifications, this.mod_revision);
if (this.replicate) if (this.replicate)
{ {
await this.notify_replicator(notifications, [ { id: req.ID } ]); await this.notify_replicator(notifications, [ { id: req.ID } ]);
@ -470,6 +472,7 @@ class EtcTree
async apply_replication(msg) async apply_replication(msg)
{ {
this.mod_revision = msg.header.revision;
const notifications = []; const notifications = [];
if ((msg.leases||[]).length) if ((msg.leases||[]).length)
{ {
@ -481,7 +484,7 @@ class EtcTree
} }
else else
{ {
this._sync_revoke_lease(lease.id, notifications); this._sync_revoke_lease(lease.id, notifications, this.mod_revision);
} }
} }
} }
@ -495,7 +498,7 @@ class EtcTree
} }
else else
{ {
this._put({ key: ev.key, value: ev.value }, ev.mod_revision, notifications); this._put({ key: ev.key, value: ev.value, lease: ev.lease }, ev.mod_revision, notifications);
} }
} }
} }