Compare commits

..

3 Commits

7 changed files with 64 additions and 25 deletions

View File

@ -100,6 +100,9 @@ Specify &lt;ca&gt; = &lt;cert&gt; if your certificate is self-signed.</dd>
<dt>--ws_keepalive_interval 30000</dt> <dt>--ws_keepalive_interval 30000</dt>
<dd>Client websocket ping (keepalive) interval in milliseconds</dd> <dd>Client websocket ping (keepalive) interval in milliseconds</dd>
<dt>--use_base64 1</dt>
<dd>Use base64 encoding of keys and values, like in etcd (enabled by default).</dd>
</dl> </dl>
### Persistence ### Persistence

View File

@ -43,6 +43,8 @@ HTTP:
Require TLS client certificates signed by <ca> or by default CA to connect. Require TLS client certificates signed by <ca> or by default CA to connect.
--ws_keepalive_interval 30000 --ws_keepalive_interval 30000
Client websocket ping (keepalive) interval in milliseconds Client websocket ping (keepalive) interval in milliseconds
--use_base64 1
Use base64 encoding of keys and values, like in etcd (enabled by default).
Persistence: Persistence:
@ -105,7 +107,14 @@ function parse()
options[arg.substr(2)] = process.argv[++i]; options[arg.substr(2)] = process.argv[++i];
} }
} }
options['stale_read'] = options['stale_read'] === '1' || options['stale_read'] === 'yes' || options['stale_read'] === 'true'; if ('stale_read' in options)
{
options['stale_read'] = options['stale_read'] === '1' || options['stale_read'] === 'yes' || options['stale_read'] === 'true';
}
if ('use_base64' in options)
{
options['use_base64'] = options['use_base64'] === '1' || options['use_base64'] === 'yes' || options['use_base64'] === 'true';
}
if (options['persist_filter']) if (options['persist_filter'])
{ {
options['persist_filter'] = require(options['persist_filter'])(options); options['persist_filter'] = require(options['persist_filter'])(options);

View File

@ -22,6 +22,14 @@ class AntiEtcd extends EventEmitter
constructor(cfg) constructor(cfg)
{ {
super(); super();
if (!('use_base64' in cfg))
{
cfg.use_base64 = 1;
}
if (!('stale_read' in cfg))
{
cfg.stale_read = 1;
}
this.clients = {}; this.clients = {};
this.client_id = 1; this.client_id = 1;
this.etctree = new EtcTree(true); this.etctree = new EtcTree(true);
@ -132,11 +140,11 @@ class AntiEtcd extends EventEmitter
if (!this.cluster) if (!this.cluster)
{ {
// Run deletion compaction without followers // Run deletion compaction without followers
const mod_revision = this.antietcd.etctree.mod_revision; const mod_revision = this.etctree.mod_revision;
if (mod_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2) if (mod_revision - this.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{ {
const revision = mod_revision - (this.cfg.compact_revisions||1000); const revision = mod_revision - (this.cfg.compact_revisions||1000);
this.antietcd.etctree.compact(revision); this.etctree.compact(revision);
} }
} }
} }
@ -411,7 +419,7 @@ class AntiEtcd extends EventEmitter
// public watch API // public watch API
async create_watch(params, callback) async create_watch(params, callback)
{ {
const watch = this.etctree.api_create_watch({ ...params, watch_id: null }, callback); const watch = this.etctree.api_create_watch(this._encodeWatch(params), (msg) => callback(this._encodeMsg(msg)));
if (!watch.created) if (!watch.created)
{ {
throw new RequestError(400, 'Requested watch revision is compacted', { compact_revision: watch.compact_revision }); throw new RequestError(400, 'Requested watch revision is compacted', { compact_revision: watch.compact_revision });
@ -534,15 +542,7 @@ class AntiEtcd extends EventEmitter
const create_request = msg.create_request; const create_request = msg.create_request;
if (!create_request.watch_id || !client.watches[create_request.watch_id]) if (!create_request.watch_id || !client.watches[create_request.watch_id])
{ {
const req = { ...create_request, watch_id: null }; const watch = this.etctree.api_create_watch(this._encodeWatch(create_request), (msg) => socket.send(JSON.stringify(this._encodeMsg(msg))));
if (this.cfg.use_base64)
{
if (req.key != null)
req.key = de64(req.key);
if (req.range_end != null)
req.range_end = de64(req.range_end);
}
const watch = this.etctree.api_create_watch(req, (msg) => this.sendToSocket(socket, msg));
if (!watch.created) if (!watch.created)
{ {
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, ...watch } })); socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, ...watch } }));
@ -579,6 +579,35 @@ class AntiEtcd extends EventEmitter
} }
} }
_encodeWatch(create_request)
{
const req = { ...create_request, watch_id: null };
if (this.cfg.use_base64)
{
if (req.key != null)
req.key = de64(req.key);
if (req.range_end != null)
req.range_end = de64(req.range_end);
}
return req;
}
_encodeMsg(msg)
{
if (this.cfg.use_base64 && msg.result && msg.result.events)
{
return { ...msg, result: { ...msg.result, events: msg.result.events.map(ev => ({
...ev,
kv: !ev.kv ? ev.kv : {
...ev.kv,
key: b64(ev.kv.key),
value: b64(ev.kv.value),
},
})) } };
}
return msg;
}
_unsubscribeClient(client_id) _unsubscribeClient(client_id)
{ {
if (!this.clients[client_id]) if (!this.clients[client_id])

View File

@ -8,7 +8,7 @@ const zlib = require('zlib');
const stableStringify = require('./stable-stringify.js'); const stableStringify = require('./stable-stringify.js');
const EtcTree = require('./etctree.js'); const EtcTree = require('./etctree.js');
const { de64, runCallbacks } = require('./common.js'); const { runCallbacks } = require('./common.js');
class AntiPersistence class AntiPersistence
{ {
@ -60,20 +60,18 @@ class AntiPersistence
if (ev.lease) if (ev.lease)
{ {
// Values with lease are never persisted // Values with lease are never persisted
const key = de64(ev.key); if (this.prev_value[ev.key] !== undefined)
if (this.prev_value[key] !== undefined)
{ {
delete this.prev_value[key]; delete this.prev_value[ev.key];
changed = true; changed = true;
} }
} }
else else
{ {
const key = de64(ev.key); const filtered = this.cfg.persist_filter(ev.key, ev.value == null ? undefined : ev.value);
const filtered = this.cfg.persist_filter(key, ev.value == null ? undefined : de64(ev.value)); if (!EtcTree.eq(filtered, this.prev_value[ev.key]))
if (!EtcTree.eq(filtered, this.prev_value[key]))
{ {
this.prev_value[key] = filtered; this.prev_value[ev.key] = filtered;
changed = true; changed = true;
} }
} }

View File

@ -22,7 +22,7 @@ function b64(k)
{ {
if (k == null) // null or undefined if (k == null) // null or undefined
return k; return k;
return this.use_base64 ? Buffer.from(k).toString('base64') : k; return Buffer.from(k).toString('base64');
} }
function runCallbacks(obj, key, new_value) function runCallbacks(obj, key, new_value)

View File

@ -573,7 +573,7 @@ class EtcTree
}); });
this.active_immediate.push(imm); this.active_immediate.push(imm);
} }
return { watch_id, created: true }; return { header: { revision: this.mod_revision }, watch_id, created: true };
} }
_get_modified(events, cur, prefix, min_rev) _get_modified(events, cur, prefix, min_rev)

View File

@ -1,6 +1,6 @@
{ {
"name": "antietcd", "name": "antietcd",
"version": "1.0.0", "version": "1.0.1",
"description": "Simplistic etcd replacement based on TinyRaft", "description": "Simplistic etcd replacement based on TinyRaft",
"main": "antietcd.js", "main": "antietcd.js",
"scripts": { "scripts": {