Compare commits

..

20 Commits

Author SHA1 Message Date
Vitaliy Filippov cd74554a06 Add d.ts 2024-06-01 20:38:19 +03:00
Vitaliy Filippov fdef7e67cc Add AntiEtcd link 2024-06-01 20:33:14 +03:00
Vitaliy Filippov b559873c19 Add TS type definitions for TinyRaft 2024-06-01 17:03:01 +03:00
Vitaliy Filippov a8b5729710 Fix message format (leaderPriority should be priority) 2024-06-01 17:03:01 +03:00
Vitaliy Filippov 60ab5c6530 Add a note about leadershipTimeout 2024-06-01 17:03:01 +03:00
Vitaliy Filippov a694c198cb Add README 2024-06-01 17:03:01 +03:00
Vitaliy Filippov 304eda11ae Fix 2 leaders during 1-3 partition test 2024-06-01 17:03:01 +03:00
Vitaliy Filippov ba83aaad01 Fix 1-3 partition test: check for 2 or 3 nodes in quorum 2024-06-01 17:03:01 +03:00
Vitaliy Filippov 751a11cfc5 Fix leadership timeout: confirm leadership only if quorum is met 2024-06-01 17:03:01 +03:00
Vitaliy Filippov 699e54cec7 Use fake timer for tests 2024-06-01 17:03:01 +03:00
Vitaliy Filippov e5388074e0 Send heartbeats to all nodes (not just followers) if leadership expiration is disabled
Otherwise, the following situation may happen:
- Node 1 is the leader and its term is 24
- Node 2 is a follower of 1, its term is 24 too
- Node 3 also thinks he's the leader, his term is 23, and he doesn't know that a new leader is elected
2024-06-01 17:02:32 +03:00
Vitaliy Filippov 95dbd9905a Fix missing onChange event when following a leader with greater term 2024-06-01 17:02:32 +03:00
Vitaliy Filippov 82c009b039 Add ESLint 2024-06-01 17:02:32 +03:00
Vitaliy Filippov c063bccac1 Add forgotten package.json 2024-06-01 17:02:17 +03:00
Vitaliy Filippov 1ae8ed438f Fix leader change event 2024-05-07 15:06:21 +03:00
Vitaliy Filippov 09669e3d67 Implement leader priorities, fix changing nodes 2024-04-30 11:48:55 +03:00
Vitaliy Filippov 0abf6f6736 Make node join existing quorum when seeing a VOTE message for larger term
Fixes testAdd and testRestart (with initialTerm = 1000)
2023-09-29 00:52:47 +03:00
Vitaliy Filippov c2e37eefd3 Add initialTerm to testAdd and make it fail; add a failing testRestart 2023-09-29 00:49:24 +03:00
Vitaliy Filippov fc72932255 Split long if-elseif into functions 2023-09-29 00:39:29 +03:00
Vitaliy Filippov 30a031ffc9 Add a test for leadership expiration 2023-09-29 00:35:37 +03:00
17 changed files with 24 additions and 3769 deletions

View File

@ -1,498 +0,0 @@
# AntiEtcd
Simplistic miniature etcd replacement based on [TinyRaft](https://git.yourcmc.ru/vitalif/tinyraft/)
- Embeddable
- REST API only, gRPC is shit and will never be supported
- [TinyRaft](https://git.yourcmc.ru/vitalif/tinyraft/)-based leader election
- Websocket-based cluster communication
- Supports a limited subset of etcd REST APIs
- With optional persistence
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or [VNPL-1.1](https://git.yourcmc.ru/vitalif/vitastor/src/branch/master/VNPL-1.1.txt)
# Contents
- [CLI Usage](#cli-usage)
- [CLI Client](#cli-client)
- [Options](#options)
- [HTTP](#http)
- [Persistence](#persistence)
- [Clustering](#clustering)
- [Embedded Usage](#embedded-usage)
- [About Persistence](#about-persistence)
- [Supported etcd APIs](#supported-etcd-apis)
- [/v3/kv/txn](#v3-kv-txn)
- [/v3/kv/put](#v3-kv-put)
- [/v3/kv/range](#v3-kv-range)
- [/v3/kv/deleterange](#v3-kv-deleterange)
- [/v3/lease/grant](#v3-lease-grant)
- [/v3/lease/keepalive](#v3-lease-keepalive)
- [/v3/lease/revoke or /v3/kv/lease/revoke](#v3-lease-revoke-or-v3-kv-lease-revoke)
- [Websocket-based watch APIs](#websocket-based-watch-apis)
- [HTTP Error Codes](#http-error-codes)
## CLI Usage
```
npm install antietcd
node_modules/.bin/antietcd \
[--cert ssl.crt] [--key ssl.key] [--port 12379] \
[--data data.gz] [--persist_interval 500] \
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381]
[other options]
```
Antietcd doesn't background itself, so use systemd or start-stop-daemon to run it as a background service.
### CLI Client
```
node_modules/.bin/anticli [OPTIONS] put <key> [<value>]
node_modules/.bin/anticli [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]
node_modules/.bin/anticli [OPTIONS] del <key> [-p|--prefix]
```
For `put`, if `<value>` is not specified, it will be read from STDIN.
Options:
<dl>
<dt>--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379</dt>
<dd>Specify HTTP endpoints to connect to</dd>
<dt>--cert &lt;cert&gt;</dt>
<dd>Use TLS with this certificate file (PEM format)</dd>
<dt>--key &lt;key&gt;</dt>
<dd>Use TLS with this key file (PEM format)</dd>
<dt>--timeout 1000</dt>
<dd>Specify request timeout in milliseconds</dd>
</dl>
## Options
### HTTP
<dl>
<dt>--port 2379</dt>
<dd>Listen port</dd>
<dt>--cert &lt;cert&gt;</dt>
<dd>Use TLS with this certificate file (PEM format)</dd>
<dt>--key &lt;key&gt;</dt>
<dd>Use TLS with this key file (PEM format)</dd>
<dt>--ca &lt;ca&gt;</dt>
<dd>Use trusted root certificates from this file.
Specify &lt;ca&gt; = &lt;cert&gt; if your certificate is self-signed.</dd>
<dt>--client_cert_auth 1</dt>
<dd>Require TLS client certificates signed by <ca> or by default CA to connect.</dd>
<dt>--ws_keepalive_interval 30000</dt>
<dd>Client websocket ping (keepalive) interval in milliseconds</dd>
</dl>
### Persistence
<dl>
<dt>--data &lt;filename&gt;</dt>
<dd>Store persistent data in &lt;filename&gt;</dd>
<dt>--persist_interval &lt;milliseconds&gt;</dt>
<dd>Persist data on disk after this interval, not immediately after change</dd>
<dt>--persist_filter ./filter.js</dt>
<dd>Use persistence filter from ./filter.js (or a module). <br />
Persistence filter is a function(cfg) returning function(key, value) ran
for every change and returning a new value or undefined to skip persistence.</dd>
<dt>--compact_revisions 1000</dt>
<dd>Number of previous revisions to keep deletion information in memory</dd>
</dl>
### Clustering
<dl>
<dt>--node_id &lt;id></dt>
<dd>ID of this cluster node</dd>
<dt>--cluster &lt;id1&gt;=&lt;url1&gt;,&lt;id2&gt;=&lt;url2&gt;,...</dt>
<dd>All other cluster nodes</dd>
<dt>--cluster_key &lt;key&gt;</dt>
<dd>Shared cluster key for identification</dd>
<dt>--election_timeout 5000</dt>
<dd>Raft election timeout</dd>
<dt>--heartbeat_timeout 1000</dt>
<dd>Raft leader heartbeat timeout</dd>
<dt>--wait_quorum_timeout 30000</dt>
<dd>Timeout for requests to wait for quorum to come up</dd>
<dt>--leader_priority &lt;number&gt;</dt>
<dd>Raft leader priority for this node (optional)</dd>
<dt>--stale_read 1</dt>
<dd>Allow to serve reads from followers. Specify 0 to disallow</dd>
<dt>--reconnect_interval 1000</dt>
<dd>Unavailable peer connection retry interval</dd>
<dt>--dump_timeout 5000</dt>
<dd>Timeout for dump command in milliseconds</dd>
<dt>--load_timeout 5000</dt>
<dd>Timeout for load command in milliseconds</dd>
<dt>--forward_timeout 1000</dt>
<dd>Timeout for forwarding requests from follower to leader in milliseconds</dd>
<dt>--replication_timeout 1000</dt>
<dd>Timeout for replicating requests from leader to follower in milliseconds</dd>
<dt>--compact_timeout 1000</dt>
<dd>Timeout for compaction requests from leader to follower in milliseconds</dd>
</dl>
## Embedded Usage
```js
const AntiEtcd = require('antietcd');
// Configuration may contain all the same options like in CLI, without "--"
// Except that persist_filter should be a callback (key, value) => newValue
const srv = new AntiEtcd({ ...configuration });
// Start server
srv.start();
// Make a local API call in generic style:
let res = await srv.api('kv_txn'|'kv_range'|'kv_put'|'kv_deleterange'|'lease_grant'|'lease_revoke'|'lease_keepalive', { ...params });
// Or function-style:
res = await srv.txn(params);
res = await srv.range(params);
res = await srv.put(params);
res = await srv.deleterange(params);
res = await srv.lease_grant(params);
res = await srv.lease_revoke(params);
res = await srv.lease_keepalive(params);
// Error handling:
try
{
res = await srv.txn(params);
}
catch (e)
{
if (e instanceof AntiEtcd.RequestError)
{
// e.code is HTTP code
// e.message is error message
}
}
// Watch API:
const watch_id = await srv.create_watch(params, (message) => console.log(message));
await srv.cancel_watch(watch_id);
// Stop server
srv.stop();
```
## About Persistence
Persistence is very simple: full database is dumped into JSON, gzipped and saved as file.
By default, it is written and fsynced on disk on every change, but it can be configured
to dump DB on disk at fixed intervals, for example, at most every 500 ms - of course,
at expense of slightly reduced crash resiliency (example: `--persist_interval 500`).
You can also specify a filter to exclude some data from persistence by using the option
`--persist_filter ./filter.js`. Persistence filter code example:
```js
function example_filter(cfg)
{
// <cfg> contains all command-line options
const prefix = cfg.exclude_keys;
if (!prefix)
{
return null;
}
return (key, value) =>
{
if (key.substr(0, prefix.length) == prefix)
{
// Skip all keys with prefix from persistence
return undefined;
}
if (key === '/statistics')
{
// Return <unneeded_key> from inside value
const decoded = JSON.parse(value);
return JSON.stringify({ ...decoded, unneeded_key: undefined });
}
return value;
};
}
module.exports = example_filter;
```
## Supported etcd APIs
NOTE: `key`, `value` and `range_end` are always encoded in base64, like in original etcd.
Range requests are only supported across "directories" separated by `/`.
It means that in range requests `key` must always end with `/` and `range_end` must always
end with `0`, and that such request will return a whole subtree of keys.
### /v3/kv/txn
Request:
```ts
type TxnRequest = {
compare?: (
{ key: string, target: "MOD", mod_revision: number, result?: "LESS" }
| { key: string, target: "CREATE", create_revision: number, result?: "LESS" }
| { key: string, target: "VERSION", version: number, result?: "LESS" }
| { key: string, target: "LEASE", lease: string, result?: "LESS" }
| { key: string, target: "VALUE", value: string }
)[],
success?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
failure?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
serializable?: boolean,
}
```
`serializable` allows to serve read-only requests from follower even if `stale_read` is not enabled.
Response:
```ts
type TxnResponse = {
header: { revision: number },
succeeded: boolean,
responses: (
{ response_put: PutResponse }
| { response_range: RangeResponse }
| { response_delete_range: DeleteRangeResponse }
)[],
}
```
### /v3/kv/put
Request:
```ts
type PutRequest = {
key: string,
value: string,
lease?: string,
}
```
Other parameters are not supported: prev_kv, ignore_value, ignore_lease.
Response:
```ts
type PutResponse = {
header: { revision: number },
}
```
### /v3/kv/range
Request:
```ts
type RangeRequest = {
key: string,
range_end?: string,
keys_only?: boolean,
serializable?: boolean,
}
```
`serializable` allows to serve read-only requests from follower even if `stale_read` is not enabled.
Other parameters are not supported: revision, limit, sort_order, sort_target,
count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision.
Response:
```ts
type RangeResponse = {
header: { revision: number },
kvs: { key: string }[] | {
key: string,
value: string,
lease?: string,
mod_revision: number,
}[],
}
```
### /v3/kv/deleterange
Request:
```ts
type DeleteRangeRequest = {
key: string,
range_end?: string,
}
```
Other parameters are not supported: prev_kv.
Response:
```ts
type DeleteRangeResponse = {
header: { revision: number },
// number of deleted keys
deleted: number,
}
```
### /v3/lease/grant
Request:
```ts
type LeaseGrantRequest = {
ID?: string,
TTL: number,
}
```
Response:
```ts
type LeaseGrantResponse = {
header: { revision: number },
ID: string,
TTL: number,
}
```
### /v3/lease/keepalive
Request:
```ts
type LeaseKeepaliveRequest = {
ID: string,
}
```
Response:
```ts
type LeaseKeepaliveResponse = {
result: {
header: { revision: number },
ID: string,
TTL: number,
}
}
```
### /v3/lease/revoke or /v3/kv/lease/revoke
Request:
```ts
type LeaseRevokeRequest = {
ID: string,
}
```
Response:
```ts
type LeaseRevokeResponse = {
header: { revision: number },
}
```
### Websocket-based watch APIs
Client-to-server message format:
```ts
type ClientMessage =
{ create_request: {
key: string,
range_end?: string,
start_revision?: number,
watch_id?: string,
} }
| { cancel_request: {
watch_id: string,
} }
| { progress_request: {} }
```
Server-to-client message format:
```ts
type ServerMessage = {
result: {
header: { revision: number },
watch_id: string,
created?: boolean,
canceled?: boolean,
compact_revision?: number,
events?: {
type: 'PUT'|'DELETE',
kv: {
key: string,
value: string,
lease?: string,
mod_revision: number,
},
}[],
}
} | { error: 'bad-json' } | { error: 'empty-message' }
```
### HTTP Error Codes
- 400 for invalid requests
- 404 for unsupported API / URL not found
- 405 for non-POST request method
- 501 for unsupported API feature - non-directory range queries and so on
- 502 for server is stopping
- 503 for quorum-related errors - quorum not available and so on

View File

@ -30,7 +30,7 @@ Some replication ideas for you:
## Example Application
TODO: Extract and describe Antietcd.
[AntiEtcd](https://git.yourcmc.ru/vitalif/antietcd/)
## Usage
@ -101,3 +101,10 @@ without being re-elected.
If all priorities are equal (or just zero), the election algorithm
becomes identical to the basic algorithm without priorities.
# Author and License
Author: Vitaliy Filippov, 2024
License: [Mozilla Public License 2.0](https://www.mozilla.org/media/MPL/2.0/index.f75d2927d3c1.txt)
or [Vitastor Network Public License 1.1](https://git.yourcmc.ru/vitalif/vitastor/src/branch/master/VNPL-1.1.txt)

View File

@ -1,259 +0,0 @@
#!/usr/bin/env node
const fsp = require('fs').promises;
const http = require('http');
const https = require('https');
const help_text = `CLI for AntiEtcd
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
Usage:
anticli.js [OPTIONS] put <key> [<value>]
anticli.js [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]
anticli.js [OPTIONS] del <key> [-p|--prefix]
Options:
[--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379]
[--cert cert.pem] [--key key.pem] [--timeout 1000]
`;
class AntiEtcdCli
{
static parse(args)
{
const cmd = [];
const options = {};
for (let i = 2; i < args.length; i++)
{
const arg = args[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
if (arg === '-h' || arg === '--help')
{
process.stderr.write(help_text);
process.exit();
}
else if (arg == '-e' || arg == '--endpoints')
{
options['endpoints'] = args[++i].split(/\s*[,\s]+\s*/);
}
else if (arg == '-p' || arg == '--prefix')
{
options['prefix'] = true;
}
else if (arg == '-v' || arg == '--print_value_only')
{
options['print_value_only'] = true;
}
else if (arg == '-k' || arg == '--keys_only')
{
options['keys_only'] = true;
}
else if (arg[0] == '-' && arg[1] !== '-')
{
process.stderr.write('Unknown option '+arg);
process.exit(1);
}
else if (arg.substr(0, 2) == '--')
{
options[arg.substr(2)] = args[++i];
}
else
{
cmd.push(arg);
}
}
if (!cmd.length || cmd[0] != 'get' && cmd[0] != 'put' && cmd[0] != 'del')
{
process.stderr.write('Supported commands: get, put, del. Use --help to see details\n');
process.exit(1);
}
return [ cmd, options ];
}
async run(cmd, options)
{
this.options = options;
if (!this.options.endpoints)
{
this.options.endpoints = [ 'http://localhost:2379' ];
}
if (this.options.cert && this.options.key)
{
this.tls = {
key: await fsp.readFile(this.options.key),
cert: await fsp.readFile(this.options.cert),
};
}
if (cmd[0] == 'get')
{
await this.get(cmd.slice(1));
}
else if (cmd[0] == 'put')
{
await this.put(cmd[1], cmd.length > 2 ? cmd[2] : undefined);
}
else if (cmd[0] == 'del')
{
await this.del(cmd.slice(1));
}
// wait until output is fully flushed
await new Promise(ok => process.stdout.write('', ok));
await new Promise(ok => process.stderr.write('', ok));
process.exit(0);
}
async get(keys)
{
if (this.options.prefix)
{
keys = keys.map(k => k.replace(/\/+$/, ''));
}
const txn = { success: keys.map(key => ({ request_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) };
const res = await this.request('/v3/kv/txn', txn);
for (const r of res.responses||[])
{
if (r.response_range)
{
for (const kv of r.response_range.kvs)
{
if (!this.options.print_value_only)
{
process.stdout.write(de64(kv.key)+'\n');
}
if (!this.options.keys_only)
{
process.stdout.write(de64(kv.value)+'\n');
}
}
}
}
}
async put(key, value)
{
if (value === undefined)
{
value = await fsp.readFile(0, { encoding: 'utf-8' });
}
const res = await this.request('/v3/kv/put', { key: b64(key), value: b64(value) });
if (res.header)
{
process.stdout.write('OK\n');
}
}
async del(keys)
{
if (this.options.prefix)
{
keys = keys.map(k => k.replace(/\/+$/, ''));
}
const txn = { success: keys.map(key => ({ request_delete_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) };
const res = await this.request('/v3/kv/txn', txn);
for (const r of res.responses||[])
{
if (r.response_delete_range)
{
process.stdout.write(r.response_delete_range.deleted+'\n');
}
}
}
async request(path, body)
{
for (const url of this.options.endpoints)
{
const cur_url = url.replace(/\/+$/, '')+path;
const res = await POST(cur_url, this.tls||{}, body, this.options.timeout||1000);
if (res.json)
{
if (res.json.error)
{
process.stderr.write(cur_url+': '+res.json.error);
process.exit(1);
}
return res.json;
}
if (res.body)
{
process.stderr.write(cur_url+': '+res.body);
}
if (res.error)
{
process.stderr.write(cur_url+': '+res.error);
if (!res.response || !res.response.statusCode)
{
// This URL is unavailable
continue;
}
}
break;
}
process.exit(1);
}
}
function POST(url, options, body, timeout)
{
return new Promise(ok =>
{
const body_text = Buffer.from(JSON.stringify(body));
let timer_id = timeout > 0 ? setTimeout(() =>
{
if (req)
req.abort();
req = null;
ok({ error: 'timeout' });
}, timeout) : null;
let req = (url.substr(0, 6).toLowerCase() == 'https://' ? https : http).request(url, { method: 'POST', headers: {
'Content-Type': 'application/json',
'Content-Length': body_text.length,
}, timeout, ...options }, (res) =>
{
if (!req)
{
return;
}
clearTimeout(timer_id);
let res_body = '';
res.setEncoding('utf8');
res.on('error', (error) => ok({ error }));
res.on('data', chunk => { res_body += chunk; });
res.on('end', () =>
{
if (res.statusCode != 200 || !/application\/json/i.exec(res.headers['content-type']))
{
ok({ response: res, body: res_body, code: res.statusCode });
return;
}
try
{
res_body = JSON.parse(res_body);
ok({ response: res, json: res_body });
}
catch (e)
{
ok({ response: res, error: e, body: res_body });
}
});
});
req.on('error', (error) => ok({ error }));
req.on('close', () => ok({ error: new Error('Connection closed prematurely') }));
req.write(body_text);
req.end();
});
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
new AntiEtcdCli().run(...AntiEtcdCli.parse(process.argv)).catch(console.error);

View File

@ -1,522 +0,0 @@
const ws = require('ws');
const TinyRaft = require('./tinyraft.js');
const { runCallbacks, RequestError } = require('./common.js');
const LEADER_MISMATCH = 'raft leader/term mismatch';
const LEADER_ONLY = 1;
const NO_WAIT_QUORUM = 2;
const READ_FROM_FOLLOWER = 4;
class AntiCluster
{
constructor(antietcd)
{
this.antietcd = antietcd;
this.cfg = antietcd.cfg;
this.cluster_connections = {};
this.last_request_id = 1;
this.subrequests = {};
this.synced = false;
this.wait_sync = [];
if (!this.cfg.node_id || !this.cfg.cluster_key)
{
throw new Error('node_id and cluster_key are required in configuration if cluster is set');
}
if (!(this.cfg.cluster instanceof Object))
{
this.cfg.cluster = (''+this.cfg.cluster).trim().split(/[\s,]*,[\s,]*/)
.reduce((a, c) => { c = c.split(/\s*=\s*/); a[c[0]] = c[1]; return a; }, {});
}
this.raft = new TinyRaft({
nodes: Object.keys(this.cfg.cluster),
nodeId: this.cfg.node_id,
heartbeatTimeout: this.cfg.heartbeat_timeout,
electionTimeout: this.cfg.election_timeout,
leaderPriority: this.cfg.leader_priority||undefined,
initialTerm: this.antietcd.stored_term,
send: (to, msg) => this._sendRaftMessage(to, msg),
});
this.raft.on('change', (event) => this._handleRaftChange(event));
this.raft.start();
// Connect to all nodes and reconnect forever
for (const node_id in this.cfg.cluster)
{
this.connectToNode(node_id);
}
}
connectToNode(node_id)
{
if (node_id != this.cfg.node_id && this.cfg.cluster[node_id] &&
(!this.cluster_connections[node_id] || !this.antietcd.clients[this.cluster_connections[node_id]]))
{
const socket = new ws.WebSocket(this.cfg.cluster[node_id].replace(/^http/, 'ws'), this.antietcd.tls);
const client_id = this.antietcd._startWebsocket(socket, () => setTimeout(() => this.connectToNode(node_id), this.cfg.reconnect_interval||1000));
this.cluster_connections[node_id] = client_id;
socket.on('open', () =>
{
if (this.antietcd.clients[client_id])
{
this.antietcd.clients[client_id].ready = true;
this.antietcd.clients[client_id].raft_node_id = node_id;
this.antietcd.clients[client_id].addr = socket._socket.remoteAddress+':'+socket._socket.remotePort;
socket.send(JSON.stringify({ identify: { key: this.cfg.cluster_key, node_id: this.cfg.node_id } }));
this.raft.start();
}
});
}
}
_peerRequest(client, request, timeout)
{
const request_id = this.last_request_id++;
request.request_id = request_id;
client.socket.send(JSON.stringify(request));
const req = this.subrequests[request_id] = { client_id: client.id };
const promise = new Promise(ok => req.cb = ok);
req.timer_id = setTimeout(() => this._completeRequest(null, request_id, { error: 'timeout' }), timeout);
return promise;
}
async replicateChange(msg)
{
if (this.raft.state !== TinyRaft.LEADER)
{
return;
}
const mod_revision = this.antietcd.etctree.mod_revision;
await this._requestFollowers({ replicate: msg }, this.cfg.replication_timeout||1000);
// We have a guarantee that all revisions before mod_revision are applied by followers,
// because replication messages are either processed synchronously or serialized in
// AntiPersistence against <wait_persist>
this.sync_revision = mod_revision;
if (this.sync_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{
const revision = this.sync_revision - (this.cfg.compact_revisions||1000);
await this._requestFollowers({ compact: { revision } }, this.cfg.compact_timeout||1000);
this.antietcd.etctree.compact(revision);
}
}
_log(msg)
{
if (this.cfg.log_level > 0)
{
console.log(msg);
}
}
async _requestFollowers(msg, timeout)
{
msg.term = this.raft.term;
const followers = this.raft.followers;
for (const follower of followers)
{
if (follower != this.cfg.node_id)
{
const client = this._getPeer(follower);
if (!client)
{
// One of peers is unavailable - immediate failure, request should be retried
this._log('Lost peer connection during replication - restarting election');
this.raft.start();
throw new RequestError(503, 'Peer connection is lost, please retry request');
}
}
}
const promises = [];
for (const follower of followers)
{
if (follower != this.cfg.node_id)
{
const client = this._getPeer(follower);
const promise = this._peerRequest(client, msg, timeout);
promises.push(promise);
}
}
const results = await Promise.all(promises);
let i = 0;
for (const follower of followers)
{
if (follower != this.cfg.node_id)
{
const result = results[i];
if (!result || result.error)
{
// One of peers is unavailable - immediate failure, request should be retried
this._log('Replication failed ('+follower+': '+(result ? result.error : 'no result')+') - restarting election');
this.raft.start();
throw new RequestError(503, 'Replication failed, please retry request');
}
i++;
}
}
}
_completeRequest(client_id, request_id, result)
{
const req = this.subrequests[request_id];
if (!req || client_id && req.client_id != client_id)
{
return;
}
delete this.subrequests[request_id];
if (req.timer_id)
{
clearTimeout(req.timer_id);
req.timer_id = null;
}
req.cb(result);
}
_handleRaftChange(event)
{
this.antietcd.emit('raftchange', event);
this._log(
'Raft '+this.cfg.node_id+': '+(event.state == TinyRaft.FOLLOWER ? 'following '+event.leader : event.state)+
', term '+event.term+(event.state == TinyRaft.LEADER ? ', followers: '+event.followers.join(', ') : '')
);
if (event.state == TinyRaft.LEADER)
{
// (Re)sync with the new set of followers
this._resync(event.followers);
this.antietcd.etctree.resume_leases();
}
else
{
this.synced = false;
this.antietcd.etctree.pause_leases();
}
}
_resync(followers)
{
this.synced = false;
if (!this.resync_state)
{
this.resync_state = {
dumps: {},
loads: {},
};
}
const seen = {};
for (const f of followers)
{
seen[f] = true;
if (f != this.cfg.node_id && !(f in this.resync_state.dumps))
{
const client = this._getPeer(f);
if (client)
{
this.resync_state.dumps[f] = null;
this._peerRequest(client, { request: {}, handler: 'dump' }, this.cfg.dump_timeout||5000).then(res =>
{
if (this.resync_state && client.raft_node_id &&
(client.raft_node_id in this.resync_state.dumps))
{
if (res.error)
{
console.error(client.raft_node_id+' dump failed with error: '+res.error);
}
else
{
this._log('Got dump from '+client.raft_node_id+' with stored term '+res.term);
}
this.resync_state.dumps[client.raft_node_id] = res.error ? null : res;
this._continueResync();
}
});
}
}
}
for (const f in this.resync_state.dumps)
{
if (!seen[f])
{
delete this.resync_state.dumps[f];
}
}
this._continueResync();
}
_continueResync()
{
if (Object.values(this.resync_state.dumps).filter(d => !d).length > 0)
{
// Some dump(s) are still pending
return;
}
this.resync_state.dumps[this.cfg.node_id] = { ...this.antietcd.etctree.dump(), term: this.antietcd.stored_term };
let max_term = -1, with_max = [];
for (const follower in this.resync_state.dumps)
{
const dump = this.resync_state.dumps[follower];
if (dump.term > max_term)
{
max_term = dump.term;
with_max = [ follower ];
}
else if (dump.term == max_term)
{
with_max.push(follower);
}
}
if (max_term < 0 || with_max.length == 0)
{
throw new Error('BUG: no max term during resync');
}
this._log('Local term '+this.antietcd.stored_term+', max follower term '+max_term+' at nodes '+with_max.join(', '));
with_max = with_max.filter(w => w != this.cfg.node_id);
// Merge databases of all nodes with maximum term
// Force other nodes to replicate the merged DB, throwing away their own states
for (let i = 0; i < with_max.length; i++)
{
const update_only = !(i == 0 && this.antietcd.stored_term != max_term);
this._log(update_only ? 'Updating database from node '+with_max[i]+' state' : 'Copying node '+with_max[i]+' state');
this.antietcd.etctree.load(this.resync_state.dumps[with_max[i]], update_only);
}
let wait = 0;
const load_request = { term: this.raft.term, load: this.antietcd.etctree.dump() };
for (const follower in this.resync_state.dumps)
{
if (follower != this.cfg.node_id)
{
const dump = this.resync_state.dumps[follower];
if (dump.term <= max_term)
{
const client = this._getPeer(follower);
if (!client)
{
this._log('Lost peer connection during resync - restarting election');
this.raft.start();
return;
}
this._log('Copying state to '+follower);
const loadstate = this.resync_state.loads[follower] = {};
wait++;
this._peerRequest(client, load_request, this.cfg.load_timeout||5000).then(res =>
{
loadstate.result = res;
this._finishResync();
});
}
}
}
if (!wait)
{
this._finishResync();
}
}
_finishResync()
{
if (Object.values(this.resync_state.dumps).filter(d => !d).length > 0 ||
Object.values(this.resync_state.loads).filter(d => !d.result).length > 0)
{
return;
}
// All current peers have copied the database, we can proceed
this.antietcd.stored_term = this.raft.term;
this.synced = true;
runCallbacks(this, 'wait_sync', []);
this._log('Synchronized with followers, new term is '+this.raft.term);
}
_isWrite(path, data)
{
if (path == 'kv_txn')
{
return ((!data.compare || !data.compare.length) &&
(!data.success || !data.success.filter(f => f.request_put || f.requestPut || f.request_delete_range || f.requestDeleteRange).length) &&
(!data.failure || !data.failure.filter(f => f.request_put || f.requestPut || f.request_delete_range || f.requestDeleteRange).length));
}
return path != 'kv_range';
}
async checkRaftState(path, leaderonly, data)
{
if (!this.raft)
{
return null;
}
if (leaderonly == LEADER_ONLY && this.raft.state != TinyRaft.LEADER)
{
throw new RequestError(503, 'Not leader');
}
if (leaderonly == NO_WAIT_QUORUM && this.raft.state == TinyRaft.CANDIDATE)
{
throw new RequestError(503, 'Quorum not available');
}
if (!this.synced)
{
// Wait for quorum / initial sync with timeout
await new Promise((ok, no) =>
{
this.wait_sync.push(ok);
setTimeout(() =>
{
this.wait_sync = this.wait_sync.filter(cb => cb != ok);
no(new RequestError(503, 'Quorum not available'));
}, this.cfg.wait_quorum_timeout||30000);
});
}
if (this.raft.state == TinyRaft.FOLLOWER &&
(this._isWrite(path, data) || !this.cfg.stale_read && !(leaderonly & READ_FROM_FOLLOWER)))
{
// Forward to leader
return await this._forwardToLeader(path, data);
}
return null;
}
async _forwardToLeader(handler, data)
{
const client = this._getPeer(this.raft.leader);
if (!client)
{
throw new RequestError(503, 'Leader is unavailable');
}
return await this._peerRequest(client, { handler, request: data }, this.cfg.forward_timeout||1000);
}
handleWsMsg(client, msg)
{
if (msg.raft)
{
if (client.raft_node_id)
{
this.raft.onReceive(client.raft_node_id, msg.raft);
}
}
else if (msg.identify)
{
if (msg.identify.key === this.cfg.cluster_key &&
msg.identify.node_id != this.cfg.node_id)
{
client.raft_node_id = msg.identify.node_id;
this._log('Got a connection from '+client.raft_node_id);
}
}
else if (msg.load)
{
this._handleLoadMsg(client, msg).catch(console.error);
}
else if (msg.replicate)
{
this._handleReplicateMsg(client, msg).catch(console.error);
}
else if (msg.request)
{
this._handleRequestMsg(client, msg).catch(console.error);
}
else if (msg.reply)
{
this._completeRequest(client.id, msg.request_id, msg.reply);
}
else if (msg.compact)
{
this._handleCompactMsg(client, msg);
}
}
async _handleRequestMsg(client, msg)
{
try
{
const res = await this.antietcd.api(msg.handler, msg.request);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: res }));
}
catch (e)
{
console.error(e);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: e.message } }));
}
}
async _handleLoadMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
this.antietcd.etctree.load(msg.load);
if (this.antietcd.persistence)
{
await this.antietcd.persistence.persist();
}
this.antietcd.stored_term = msg.term;
this.synced = true;
runCallbacks(this, 'wait_sync', []);
this._log('Synchronized with leader, new term is '+msg.term);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
async _handleReplicateMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
await this.antietcd.etctree.apply_replication(msg.replicate);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
_handleCompactMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
this.antietcd.etctree.compact(msg.compact.revision);
this._log('Compacted deletions up to '+msg.compact.revision);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
_getPeer(to)
{
if (to == this.cfg.node_id)
{
throw new Error('BUG: attempt to get connection to self');
}
const client_id = this.cluster_connections[to];
if (!client_id)
{
return null;
}
const client = this.antietcd.clients[client_id];
if (!client || !client.ready)
{
return null;
}
return client;
}
_sendRaftMessage(to, msg)
{
const client = this._getPeer(to);
if (client)
{
client.socket.send(JSON.stringify({ raft: msg }));
}
}
}
AntiCluster.LEADER_ONLY = LEADER_ONLY;
AntiCluster.NO_WAIT_QUORUM = NO_WAIT_QUORUM;
AntiCluster.READ_FROM_FOLLOWER = READ_FROM_FOLLOWER;
module.exports = AntiCluster;

View File

@ -1,118 +0,0 @@
#!/usr/bin/env node
const AntiEtcd = require('./antietcd.js');
const help_text = `Miniature etcd replacement based on TinyRaft
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
Usage:
${process.argv[0]} ${process.argv[1]} \
[--cert ssl.crt] [--key ssl.key] [--port 12379] \
[--data data.gz] [--persist-filter ./filter.js] [--persist_interval 500] \
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381] \
[other options]
Supported etcd REST APIs:
/v3/kv/txn /v3/kv/put /v3/kv/range /v3/kv/deleterange
/v3/lease/grant /v3/lease/keepalive /v3/lease/revoke /v3/kv/lease/revoke
websocket-based watch API (create_request, cancel_request, progress_request)
Options:
HTTP:
--port 2379
Listen port
--cert <cert>
Use TLS with this certificate file (PEM format)
--key <key>
Use TLS with this key file (PEM format)
--ca <ca>
Use trusted root certificates from this file.
Specify <ca> = <cert> if your certificate is self-signed.
--client_cert_auth 1
Require TLS client certificates signed by <ca> or by default CA to connect.
--ws_keepalive_interval 30000
Client websocket ping (keepalive) interval in milliseconds
Persistence:
--data <filename>
Store persistent data in <filename>
--persist_interval <milliseconds>
Persist data on disk after this interval, not immediately after change
--persist_filter ./filter.js
Use persistence filter from ./filter.js (or a module).
Persistence filter is a function(cfg) returning function(key, value) ran
for every change and returning a new value or undefined to skip persistence.
--compact_revisions 1000
Number of previous revisions to keep deletion information in memory
Clustering:
--node_id <id>
ID of this cluster node
--cluster <id1>=<url1>,<id2>=<url2>,...
All other cluster nodes
--cluster_key <key>
Shared cluster key for identification
--election_timeout 5000
Raft election timeout
--heartbeat_timeout 1000
Raft leader heartbeat timeout
--wait_quorum_timeout 30000
Timeout for requests to wait for quorum to come up
--leader_priority <number>
Raft leader priority for this node (optional)
--stale_read 1
Allow to serve reads from followers. Specify 0 to disallow
--reconnect_interval 1000
Unavailable peer connection retry interval
--dump_timeout 5000
Timeout for dump command in milliseconds
--load_timeout 5000
Timeout for load command in milliseconds
--forward_timeout 1000
Timeout for forwarding requests from follower to leader in milliseconds
--replication_timeout 1000
Timeout for replicating requests from leader to follower in milliseconds
--compact_timeout 1000
Timeout for compaction requests from leader to follower in milliseconds
`;
function parse()
{
const options = { stale_read: 1 };
for (let i = 2; i < process.argv.length; i++)
{
const arg = process.argv[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
if (arg === '-h' || arg === '--help')
{
process.stderr.write(help_text);
process.exit();
}
else if (arg.substr(0, 2) == '--')
{
options[arg.substr(2)] = process.argv[++i];
}
}
options['stale_read'] = options['stale_read'] === '1' || options['stale_read'] === 'yes' || options['stale_read'] === 'true';
if (options['persist_filter'])
{
options['persist_filter'] = require(options['persist_filter'])(options);
}
return options;
}
const antietcd = new AntiEtcd(parse());
// Set exit hook
const on_stop_cb = async () => { await antietcd.stop(); process.exit(0); };
process.on('SIGINT', on_stop_cb);
process.on('SIGTERM', on_stop_cb);
process.on('SIGQUIT', on_stop_cb);
antietcd.start().catch(console.error);

155
antietcd.d.ts vendored
View File

@ -1,155 +0,0 @@
import type { EventEmitter } from 'events';
import type { TinyRaftEvents } from './tinyraft';
export type AntiEtcdEvents = {
raftchange: TinyRaftEvents['change'],
};
export class AntiEtcd extends EventEmitter<AntiEtcdEvents>
{
constructor(cfg: object);
start(): Promise<void>;
stop(): Promise<void>;
api(path: 'kv_txn'|'kv_range'|'kv_put'|'kv_deleterange'|'lease_grant'|'lease_revoke'|'lease_keepalive', params: object): Promise<object>;
txn(params: TxnRequest): Promise<TxnResponse>;
range(params: RangeRequest): Promise<RangeResponse>;
put(params: PutRequest): Promise<PutResponse>;
deleterange(params: DeleteRangeRequest): Promise<DeleteRangeResponse>;
lease_grant(params: LeaseGrantRequest): Promise<LeaseGrantResponse>;
lease_revoke(params: LeaseRevokeRequest): Promise<LeaseRevokeResponse>;
lease_keepalive(params: LeaseKeepaliveRequest): Promise<LeaseKeepaliveResponse>;
create_watch(params: WatchCreateRequest, callback: (ServerMessage) => void): Promise<string|number>;
cancel_watch(watch_id: string|number): Promise<void>;
}
export type TxnRequest = {
compare?: (
{ key: string, target: "MOD", mod_revision: number, result?: "LESS" }
| { key: string, target: "CREATE", create_revision: number, result?: "LESS" }
| { key: string, target: "VERSION", version: number, result?: "LESS" }
| { key: string, target: "LEASE", lease: string, result?: "LESS" }
| { key: string, target: "VALUE", value: string }
)[],
success?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
failure?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
serializable?: boolean,
};
export type TxnResponse = {
header: { revision: number },
succeeded: boolean,
responses: (
{ response_put: PutResponse }
| { response_range: RangeResponse }
| { response_delete_range: DeleteRangeResponse }
)[],
};
export type PutRequest = {
key: string,
value: string,
lease?: string,
};
export type PutResponse = {
header: { revision: number },
};
export type RangeRequest = {
key: string,
range_end?: string,
keys_only?: boolean,
serializable?: boolean,
};
export type RangeResponse = {
header: { revision: number },
kvs: { key: string }[] | {
key: string,
value: string,
lease?: string,
mod_revision: number,
}[],
};
export type DeleteRangeRequest = {
key: string,
range_end?: string,
};
export type DeleteRangeResponse = {
header: { revision: number },
// number of deleted keys
deleted: number,
};
export type LeaseGrantRequest = {
ID?: string,
TTL: number,
};
export type LeaseGrantResponse = {
header: { revision: number },
ID: string,
TTL: number,
};
export type LeaseKeepaliveRequest = {
ID: string,
};
export type LeaseKeepaliveResponse = {
result: {
header: { revision: number },
ID: string,
TTL: number,
}
};
export type LeaseRevokeRequest = {
ID: string,
};
export type LeaseRevokeResponse = {
header: { revision: number },
};
export type WatchCreateRequest = {
key: string,
range_end?: string,
start_revision?: number,
watch_id?: string|number,
}
export type ClientMessage =
{ create_request: WatchCreateRequest }
| { cancel_request: { watch_id: string } }
| { progress_request: {} };
export type ServerMessage = {
result: {
header: { revision: number },
watch_id: string|number,
created?: boolean,
canceled?: boolean,
compact_revision?: number,
events?: {
type: 'PUT'|'DELETE',
kv: {
key: string,
value: string,
lease?: string,
mod_revision: number,
},
}[],
}
} | { error: 'bad-json' } | { error: 'empty-message' };

View File

@ -1,548 +0,0 @@
#!/usr/bin/node
const fsp = require('fs').promises;
const { URL } = require('url');
const http = require('http');
const https = require('https');
const EventEmitter = require('events');
const ws = require('ws');
const EtcTree = require('./etctree.js');
const AntiPersistence = require('./antipersistence.js');
const AntiCluster = require('./anticluster.js');
const { runCallbacks, RequestError } = require('./common.js');
class AntiEtcd extends EventEmitter
{
constructor(cfg)
{
super();
this.clients = {};
this.client_id = 1;
this.etctree = new EtcTree(true);
this.persistence = null;
this.cluster = null;
this.stored_term = 0;
this.cfg = cfg;
this.loading = false;
this.stopped = false;
this.inflight = 0;
this.wait_inflight = [];
this.api_watches = {};
}
async start()
{
if (this.cfg.data || this.cfg.cluster)
{
this.etctree.set_replicate_watcher(msg => this._persistAndReplicate(msg));
}
if (this.cfg.data)
{
this.persistence = new AntiPersistence(this);
// Load data from disk
await this.persistence.load();
}
if (this.cfg.cluster)
{
this.cluster = new AntiCluster(this);
}
if (this.cfg.cert)
{
this.tls = {
key: await fsp.readFile(this.cfg.key),
cert: await fsp.readFile(this.cfg.cert),
};
if (this.cfg.ca)
{
this.tls.ca = await fsp.readFile(this.cfg.ca);
}
if (this.cfg.client_cert_auth)
{
this.tls.requestCert = true;
}
this.server = https.createServer(this.tls, (req, res) => this._handleRequest(req, res));
}
else
{
this.server = http.createServer((req, res) => this._handleRequest(req, res));
}
this.wss = new ws.WebSocketServer({ server: this.server });
// eslint-disable-next-line no-unused-vars
this.wss.on('connection', (conn, req) => this._startWebsocket(conn, null));
this.server.listen(this.cfg.port || 2379);
}
async stop()
{
if (this.stopped)
{
return;
}
this.stopped = true;
// Wait until all requests complete
while (this.inflight > 0)
{
await new Promise(ok => this.wait_inflight.push(ok));
}
if (this.persistence)
{
await this.persistence.persist();
}
}
async _persistAndReplicate(msg)
{
let res = [];
if (this.cluster)
{
// We have to guarantee that replication is processed sequentially
// So we have to send messages without first awaiting for anything!
res.push(this.cluster.replicateChange(msg));
}
if (this.persistence)
{
res.push(this.persistence.persistChange(msg));
}
if (res.length == 1)
{
await res[0];
}
else if (res.length > 0)
{
let done = 0;
await new Promise((allOk, allNo) =>
{
res.map(promise => promise.then(res =>
{
if ((++done) == res.length)
allOk();
}).catch(e =>
{
console.error(e);
allNo(e);
}));
});
}
if (!this.cluster)
{
// Run deletion compaction without followers
const mod_revision = this.antietcd.etctree.mod_revision;
if (mod_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{
const revision = mod_revision - (this.cfg.compact_revisions||1000);
this.antietcd.etctree.compact(revision);
}
}
}
_handleRequest(req, res)
{
let data = [];
req.on('data', (chunk) => data.push(chunk));
req.on('end', async () =>
{
this.inflight++;
data = Buffer.concat(data);
let body = '';
let code = 200;
let ctype = 'text/plain; charset=utf-8';
let reply;
try
{
if (req.headers['content-type'] != 'application/json')
{
throw new RequestError(400, 'content-type should be application/json');
}
body = data.toString();
try
{
data = data.length ? JSON.parse(data) : {};
}
catch (e)
{
throw new RequestError(400, 'body should be valid JSON');
}
if (!(data instanceof Object) || data instanceof Array)
{
throw new RequestError(400, 'body should be JSON object');
}
reply = await this._runHandler(req, data);
reply = JSON.stringify(reply);
ctype = 'application/json';
}
catch (e)
{
if (e instanceof RequestError)
{
code = e.code;
reply = e.message;
}
else
{
console.error(e);
code = 500;
reply = 'Internal error: '+e.message;
}
}
try
{
// Access log
if (this.cfg.log_level > 1)
{
console.log(
new Date().toISOString()+
' '+(req.headers['x-forwarded-for'] || (req.socket.remoteAddress + ':' + req.socket.remotePort))+
' '+req.method+' '+req.url+' '+code+'\n '+body.replace(/\n/g, '\\n')+
'\n '+reply.replace(/\n/g, '\\n')
);
}
reply = Buffer.from(reply);
res.writeHead(code, {
'Content-Type': ctype,
'Content-Length': reply.length,
});
res.write(reply);
res.end();
}
catch (e)
{
console.error(e);
}
this.inflight--;
if (!this.inflight)
{
runCallbacks(this, 'wait_inflight', []);
}
});
}
_startWebsocket(socket, reconnect)
{
const client_id = this.client_id++;
this.clients[client_id] = {
id: client_id,
addr: socket._socket ? socket._socket.remoteAddress+':'+socket._socket.remotePort : '',
socket,
alive: true,
watches: {},
};
socket.on('pong', () => this.clients[client_id].alive = true);
socket.on('error', e => console.error(e.syscall === 'connect' ? e.message : e));
const pinger = setInterval(() =>
{
if (!this.clients[client_id])
{
return;
}
if (!this.clients[client_id].alive)
{
return socket.terminate();
}
this.clients[client_id].alive = false;
socket.ping(() => {});
}, this.cfg.ws_keepalive_interval||30000);
socket.on('message', (msg) =>
{
try
{
msg = JSON.parse(msg);
}
catch (e)
{
socket.send(JSON.stringify({ error: 'bad-json' }));
return;
}
if (!msg)
{
socket.send(JSON.stringify({ error: 'empty-message' }));
}
else
{
this._handleMessage(client_id, msg, socket);
}
});
socket.on('close', () =>
{
this._unsubscribeClient(client_id);
clearInterval(pinger);
delete this.clients[client_id];
socket.terminate();
if (reconnect)
{
reconnect();
}
});
return client_id;
}
async _runHandler(req, data)
{
// v3/kv/txn
// v3/kv/range
// v3/kv/put
// v3/kv/deleterange
// v3/lease/grant
// v3/lease/keepalive
// v3/lease/revoke O_o
// v3/kv/lease/revoke O_o
const requestUrl = new URL(req.url, 'http://'+(req.headers.host || 'localhost'));
if (requestUrl.searchParams.get('leaderonly'))
{
data.leaderonly = true;
}
if (requestUrl.searchParams.get('serializable'))
{
data.serializable = true;
}
if (requestUrl.searchParams.get('nowaitquorum'))
{
data.nowaitquorum = true;
}
try
{
if (requestUrl.pathname.substr(0, 4) == '/v3/')
{
const path = requestUrl.pathname.substr(4).replace(/\/+$/, '').replace(/\/+/g, '_');
if (req.method != 'POST')
{
throw new RequestError(405, 'Please use POST method');
}
return await this.api(path, data);
}
else if (requestUrl.pathname == '/dump')
{
return await this.api('dump', data);
}
else
{
throw new RequestError(404, '');
}
}
catch (e)
{
if ((e instanceof RequestError) && e.code == 404)
{
throw new RequestError(404, 'Supported APIs: /v3/kv/txn, /v3/kv/range, /v3/kv/put, /v3/kv/deleterange, '+
'/v3/lease/grant, /v3/lease/revoke, /v3/kv/lease/revoke, /v3/lease/keepalive');
}
else
{
throw e;
}
}
}
// public generic handler
async api(path, data)
{
if (this.stopped)
{
throw new RequestError(502, 'Server is stopping');
}
if (path !== 'dump' && this.cluster)
{
const res = await this.cluster.checkRaftState(
path,
(data.leaderonly ? AntiCluster.LEADER_ONLY : 0) |
(data.serializable ? AntiCluster.READ_FROM_FOLLOWER : 0) |
(data.nowaitquorum ? AntiCluster.NO_WAIT_QUORUM : 0),
data
);
if (res)
{
return res;
}
}
const cb = this['_handle_'+path];
if (cb)
{
const res = cb.call(this, data);
if (res instanceof Promise)
{
return await res;
}
return res;
}
throw new RequestError(404, 'Unsupported API');
}
// public wrappers
async txn(params)
{
return await this.api('kv_txn', params);
}
async range(params)
{
return await this.api('kv_range', params);
}
async put(params)
{
return await this.api('kv_put', params);
}
async deleterange(params)
{
return await this.api('kv_deleterange', params);
}
async lease_grant(params)
{
return await this.api('lease_grant', params);
}
async lease_revoke(params)
{
return await this.api('lease_revoke', params);
}
async lease_keepalive(params)
{
return await this.api('lease_keepalive', params);
}
// public watch API
async create_watch(params, callback)
{
const watch = this.etctree.api_create_watch({ ...params, watch_id: null }, callback);
if (!watch.created)
{
throw new RequestError(400, 'Requested watch revision is compacted', { compact_revision: watch.compact_revision });
}
const watch_id = params.watch_id || watch.watch_id;
this.api_watches[watch_id] = watch.watch_id;
return watch_id;
}
async cancel_watch(watch_id)
{
const mapped_id = this.api_watches[watch_id];
if (!mapped_id)
{
throw new RequestError(400, 'Watch not found');
}
this.etctree.api_cancel_watch({ watch_id: mapped_id });
delete this.api_watches[watch_id];
}
// internal handlers
async _handle_kv_txn(data)
{
return await this.etctree.api_txn(data);
}
async _handle_kv_range(data)
{
const r = await this.etctree.api_txn({ success: [ { request_range: data } ] });
return { header: r.header, ...r.responses[0].response_range };
}
async _handle_kv_put(data)
{
const r = await this.etctree.api_txn({ success: [ { request_put: data } ] });
return { header: r.header, ...r.responses[0].response_put };
}
async _handle_kv_deleterange(data)
{
const r = await this.etctree.api_txn({ success: [ { request_delete_range: data } ] });
return { header: r.header, ...r.responses[0].response_delete_range };
}
_handle_lease_grant(data)
{
return this.etctree.api_grant_lease(data);
}
_handle_lease_revoke(data)
{
return this.etctree.api_revoke_lease(data);
}
_handle_kv_lease_revoke(data)
{
return this.etctree.api_revoke_lease(data);
}
_handle_lease_keepalive(data)
{
return this.etctree.api_keepalive_lease(data);
}
// eslint-disable-next-line no-unused-vars
_handle_dump(data)
{
return { ...this.etctree.dump(), term: this.stored_term };
}
_handleMessage(client_id, msg, socket)
{
const client = this.clients[client_id];
if (this.cfg.access_log)
{
console.log(new Date().toISOString()+' '+client.addr+' '+(client.raft_node_id || '-')+' -> '+JSON.stringify(msg));
}
if (msg.create_request)
{
const create_request = msg.create_request;
if (!create_request.watch_id || !client.watches[create_request.watch_id])
{
const watch = this.etctree.api_create_watch(
{ ...create_request, watch_id: null }, (msg) => socket.send(JSON.stringify(msg))
);
if (!watch.created)
{
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, ...watch } }));
}
else
{
create_request.watch_id = create_request.watch_id || watch.watch_id;
client.watches[create_request.watch_id] = watch.watch_id;
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, created: true } }));
}
}
}
else if (msg.cancel_request)
{
const mapped_id = client.watches[msg.cancel_request.watch_id];
if (mapped_id)
{
this.etctree.api_cancel_watch({ watch_id: mapped_id });
delete client.watches[msg.cancel_request.watch_id];
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: msg.cancel_request.watch_id, canceled: true } }));
}
}
else if (msg.progress_request)
{
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision } } }));
}
else
{
if (!this.cluster)
{
return;
}
this.cluster.handleWsMsg(client, msg);
}
}
_unsubscribeClient(client_id)
{
if (!this.clients[client_id])
{
return;
}
for (const watch_id in this.clients[client_id].watches)
{
const mapped_id = this.clients[client_id].watches[watch_id];
this.etctree.api_cancel_watch({ watch_id: mapped_id });
}
}
}
AntiEtcd.RequestError = RequestError;
module.exports = AntiEtcd;

View File

@ -1,134 +0,0 @@
const fs = require('fs');
const fsp = require('fs').promises;
const zlib = require('zlib');
const stableStringify = require('./stable-stringify.js');
const EtcTree = require('./etctree.js');
const { de64, runCallbacks } = require('./common.js');
class AntiPersistence
{
constructor(antietcd)
{
this.cfg = antietcd.cfg;
this.antietcd = antietcd;
this.prev_value = {};
this.persist_timer = null;
this.wait_persist = null;
}
async load()
{
// eslint-disable-next-line no-unused-vars
const [ err, stat ] = await new Promise(ok => fs.stat(this.cfg.data, (err, stat) => ok([ err, stat ])));
if (!err)
{
let data = await fsp.readFile(this.cfg.data);
data = await new Promise((ok, no) => zlib.gunzip(data, (err, res) => err ? no(err) : ok(res)));
data = JSON.parse(data);
this.loading = true;
this.antietcd.etctree.load(data);
this.loading = false;
this.antietcd.stored_term = data['term'] || 0;
}
else if (err.code != 'ENOENT')
{
throw err;
}
}
async persistChange(msg)
{
if (this.loading)
{
return;
}
if (!msg.events || !msg.events.length)
{
// lease-only changes don't need to be persisted
return;
}
if (this.cfg.persist_filter)
{
let changed = false;
for (const ev of msg.events)
{
if (ev.lease)
{
// Values with lease are never persisted
const key = de64(ev.key);
if (this.prev_value[key] !== undefined)
{
delete this.prev_value[key];
changed = true;
}
}
else
{
const key = de64(ev.key);
const filtered = this.cfg.persist_filter(key, ev.value == null ? undefined : de64(ev.value));
if (!EtcTree.eq(filtered, this.prev_value[key]))
{
this.prev_value[key] = filtered;
changed = true;
}
}
}
if (!changed)
{
return;
}
}
await this.schedulePersist();
}
async schedulePersist()
{
if (!this.cfg.persist_interval)
{
await this.persist();
return;
}
if (!this.persist_timer)
{
this.persist_timer = setTimeout(() =>
{
this.persist_timer = null;
this.persist().catch(console.error);
}, this.cfg.persist_interval);
}
}
async persist()
{
if (!this.cfg.data)
{
return;
}
while (this.wait_persist)
{
await new Promise(ok => this.wait_persist.push(ok));
}
this.wait_persist = [];
try
{
let dump = this.antietcd.etctree.dump(true);
dump['term'] = this.antietcd.stored_term;
dump = stableStringify(dump);
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));
const fh = await fsp.open(this.cfg.data+'.tmp', 'w');
await fh.writeFile(dump);
await fh.sync();
await fh.close();
await fsp.rename(this.cfg.data+'.tmp', this.cfg.data);
}
catch (e)
{
console.error('Error persisting data to disk: '+e);
process.exit(1);
}
runCallbacks(this, 'wait_persist', null);
}
}
module.exports = AntiPersistence;

View File

@ -1,35 +0,0 @@
class RequestError
{
constructor(code, text, details)
{
this.code = code;
this.message = text;
this.details = details;
}
}
function de64(k)
{
if (k == null) // null or undefined
return k;
return Buffer.from(k, 'base64').toString();
}
function runCallbacks(obj, key, new_value)
{
const cbs = obj[key];
obj[key] = new_value;
if (cbs)
{
for (const cb of cbs)
{
cb();
}
}
}
module.exports = {
RequestError,
de64,
runCallbacks,
};

View File

@ -1,871 +0,0 @@
const crypto = require('crypto');
const stableStringify = require('./stable-stringify.js');
const { RequestError } = require('./common.js');
/*type TreeNode = {
value?: any,
create_revision?: number,
mod_revision?: number,
version?: number,
lease?: string,
children: { [string]: TreeNode },
watchers?: number[],
key_watchers?: number[],
};*/
class EtcTree
{
constructor(use_base64)
{
this.state = {};
this.leases = {};
this.watchers = {};
this.watcher_id = 0;
this.mod_revision = 0;
this.compact_revision = 0;
this.use_base64 = use_base64;
this.replicate = null;
this.paused = false;
this.active_immediate = [];
}
destroy()
{
this.pause_leases();
for (const imm of this.active_immediate)
{
clearImmediate(imm);
}
}
set_replicate_watcher(replicate)
{
// Replication watcher is special:
// It should be an async function and it is called BEFORE notifying all
// other watchers about any change.
// It may also throw to prevent notifying at all if replication fails.
this.replicate = replicate;
}
de64(k)
{
if (k == null) // null or undefined
return k;
return this.use_base64 ? Buffer.from(k, 'base64').toString() : k;
}
b64(k)
{
if (k == null) // null or undefined
return k;
return this.use_base64 ? Buffer.from(k).toString('base64') : k;
}
_check(chk)
{
const parts = this._key_parts(this.de64(chk.key));
const { cur } = this._get_subtree(parts, false, false);
let check_value, ref_value;
if (chk.target === 'MOD')
{
check_value = cur && cur.mod_revision || 0;
ref_value = chk.mod_revision || 0;
}
else if (chk.target === 'CREATE')
{
check_value = cur && cur.create_revision || 0;
ref_value = chk.create_revision || 0;
}
else if (chk.target === 'VERSION')
{
check_value = cur && cur.version || 0;
ref_value = chk.version || 0;
}
else if (chk.target === 'LEASE')
{
check_value = cur && cur.lease;
ref_value = chk.lease;
}
else if (chk.target === 'VALUE')
{
check_value = cur && cur.value;
ref_value = chk.value;
}
else
{
throw new RequestError(501, 'Unsupported comparison target: '+chk.target);
}
if (chk.result === 'LESS')
{
return check_value < ref_value;
}
else if (chk.result)
{
throw new RequestError(501, 'Unsupported comparison result: '+chk.result);
}
return check_value == ref_value;
}
_key_parts(key)
{
const parts = key.replace(/\/\/+/g, '/').replace(/\/$/g, ''); // trim beginning?
return parts === '' ? [] : parts.split('/');
}
_get_range(req)
{
const key = this.de64(req.key);
const end = this.de64(req.range_end);
if (end != null && (key[key.length-1] != '/' || end[end.length-1] != '0' ||
end.substr(0, end.length-1) !== key.substr(0, key.length-1)))
{
throw new RequestError(501, 'Non-directory range queries are unsupported');
}
const parts = this._key_parts(key);
return { parts, all: end != null };
}
_get_subtree(parts, create, notify)
{
let cur = this.state;
let watchers = notify ? [] : null;
for (let k of parts)
{
if (notify && cur.watchers)
{
watchers.push.apply(watchers, cur.watchers);
}
if (!cur.children)
{
if (!create)
{
return {};
}
cur.children = {};
}
if (!cur.children[k])
{
if (!create)
{
return {};
}
cur.children[k] = {};
}
cur = cur.children[k];
}
if (notify && cur.watchers)
{
watchers.push.apply(watchers, cur.watchers);
}
return { watchers, cur };
}
// create a snapshot of all data including leases
dump(persistent_only, value_filter)
{
const snapshot = {
state: this._copy_tree(this.state, persistent_only, value_filter) || {},
mod_revision: this.mod_revision,
compact_revision: this.compact_revision,
};
if (!persistent_only)
{
snapshot.leases = {};
for (const id in this.leases)
{
const lease = this.leases[id];
snapshot.leases[id] = { ttl: lease.ttl, expires: lease.expires };
}
}
return snapshot;
}
_copy_tree(cur, no_lease, value_filter)
{
let nonempty = cur.value != null && (!no_lease || !cur.lease);
let filtered;
if (nonempty && value_filter)
{
filtered = value_filter(cur.value);
nonempty = nonempty && filtered != null;
}
const copy = (nonempty ? { ...cur } : {});
copy.children = {};
if (nonempty && value_filter)
{
copy.value = filtered;
}
delete copy.watchers;
delete copy.key_watchers;
let has_children = false;
for (const k in cur.children)
{
const child = this._copy_tree(cur.children[k], no_lease, value_filter);
if (child)
{
copy.children[k] = child;
has_children = true;
}
}
if (!nonempty && !has_children)
{
return null;
}
if (!has_children)
{
delete copy.children;
}
return copy;
}
// load snapshot of all data including leases
load(snapshot, update_only)
{
if (!update_only || this.mod_revision < snapshot.mod_revision)
{
this.mod_revision = snapshot.mod_revision;
}
if (!update_only || this.compact_revision > (snapshot.compact_revision||0))
{
this.compact_revision = snapshot.compact_revision||0;
}
// First apply leases
const notifications = [];
if (!update_only && snapshot.leases)
{
for (const id in this.leases)
{
if (!snapshot.leases[id])
{
// Revoke without replicating and notifying
this._sync_revoke_lease(id, notifications, this.mod_revision);
}
}
}
for (const id in snapshot.leases||{})
{
this.load_lease({ id, ...snapshot.leases[id] });
}
// Then find and apply the difference in data
this._restore_diff(update_only, this.state, snapshot.state, null, this.state.watchers || [], notifications);
this._notify(notifications);
}
_restore_diff(update_only, cur_old, cur_new, prefix, watchers, notifications)
{
if (!update_only || !cur_old.mod_revision || cur_old.mod_revision < cur_new.mod_revision)
{
const key = prefix === null ? '' : prefix;
if (!eq(cur_old.lease, cur_new.lease))
{
if (cur_old.lease && this.leases[cur_old.lease])
{
delete this.leases[cur_old.lease].keys[key];
}
cur_old.lease = cur_new.lease;
if (cur_new.lease && this.leases[cur_new.lease])
{
this.leases[cur_new.lease].keys[key] = true;
}
}
cur_old.mod_revision = cur_new.mod_revision;
cur_old.create_revision = cur_new.create_revision;
cur_old.version = cur_new.version;
if (!eq(cur_old.value, cur_new.value))
{
cur_old.value = cur_new.value;
const key_watchers = (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers);
const notify = { watchers: key_watchers, key, value: cur_new.value, mod_revision: cur_new.mod_revision };
if (cur_new.lease)
{
notify.lease = cur_new.lease;
}
notifications.push(notify);
}
}
cur_old.children = cur_old.children || {};
for (const k in cur_new.children)
{
if (!cur_old.children[k])
{
cur_old.children[k] = cur_new.children[k];
}
else
{
this._restore_diff(
update_only, cur_old.children[k], cur_new.children[k],
prefix === null ? k : prefix+'/'+k,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
notifications
);
}
}
if (!update_only)
{
for (const k in cur_old.children)
{
if (!cur_new.children || !cur_new.children[k])
{
// Delete subtree
this._delete_all(
notifications,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
cur_old.children[k], true,
prefix === null ? k : prefix+'/'+k,
this.mod_revision
);
}
}
}
}
// slave/follower nodes don't expire leases themselves, they listen for the leader instead
pause_leases()
{
if (this.paused)
{
return;
}
this.paused = true;
for (const id in this.leases)
{
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
}
}
resume_leases()
{
if (!this.paused)
{
return;
}
this.paused = false;
for (const id in this.leases)
{
this._set_expire(id);
}
}
_set_expire(id)
{
if (!this.paused)
{
const lease = this.leases[id];
if (!lease.timer_id)
{
lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }).catch(console.error), lease.expires - Date.now());
}
}
}
async api_grant_lease(req)
{
let id;
while (!id || this.leases[id])
{
id = crypto.randomBytes(8).toString('hex');
}
const expires = Date.now() + req.TTL*1000;
this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} };
this.mod_revision++;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl: req.TTL, expires } ] });
}
return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL };
}
async api_keepalive_lease(req)
{
const id = req.ID;
if (!this.leases[id])
{
throw new RequestError(400, 'unknown lease');
}
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
const ttl = this.leases[id].ttl;
lease.expires = Date.now() + ttl*1000;
this.mod_revision++;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl, expires: lease.expires } ] });
}
// extra wrapping in { result: ... }
return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } };
}
load_lease(lease)
{
const id = lease.id;
if (!this.leases[id])
{
this.leases[id] = { ...lease, timer_id: null, keys: {} };
}
else if (this.leases[id].ttl != lease.ttl ||
this.leases[id].expires != lease.expires)
{
this.leases[id].ttl = lease.ttl;
this.leases[id].expires = lease.expires;
}
else
{
return false;
}
if (this.leases[id].timer_id)
{
clearTimeout(this.leases[id].timer_id);
this.leases[id].timer_id = null;
}
this._set_expire(id);
return true;
}
_sync_revoke_lease(id, notifications, next_revision)
{
if (!this.leases[id])
{
throw new RequestError(400, 'unknown lease');
}
for (const key in this.leases[id].keys)
{
this._delete_range({ key }, next_revision, notifications);
}
delete this.leases[id];
}
async api_revoke_lease(req, no_throw)
{
const notifications = [];
if (!this.leases[req.ID])
{
if (no_throw)
return null;
throw new RequestError(400, 'unknown lease');
}
this.mod_revision++;
this._sync_revoke_lease(req.ID, notifications, this.mod_revision);
if (this.replicate)
{
await this.notify_replicator(notifications, [ { id: req.ID } ]);
}
this._notify(notifications);
return { header: { revision: this.mod_revision } };
}
async notify_replicator(notifications, leases)
{
// First replicate the change and then notify watchers about it
const all_changes = {};
for (const chg of notifications)
{
all_changes[chg.key] = { ...chg };
delete all_changes[chg.key].watchers;
}
await this.replicate({ header: { revision: this.mod_revision }, events: Object.values(all_changes), leases });
}
async apply_replication(msg)
{
this.mod_revision = msg.header.revision;
const notifications = [];
if ((msg.leases||[]).length)
{
for (const lease of msg.leases)
{
if (lease.ttl)
{
this.load_lease(lease);
}
else
{
this._sync_revoke_lease(lease.id, notifications, this.mod_revision);
}
}
}
if ((msg.events||[]).length)
{
for (const ev of msg.events)
{
if (ev.value == null)
{
this._delete_range({ key: ev.key }, ev.mod_revision, notifications);
}
else
{
this._put({ key: ev.key, value: ev.value, lease: ev.lease }, ev.mod_revision, notifications);
}
}
}
if (this.replicate)
{
await this.notify_replicator(notifications, msg.leases);
}
this._notify(notifications);
}
// forget deletions before compact_revision
compact(compact_revision)
{
this._compact(compact_revision, this.state);
this.compact_revision = compact_revision;
}
_compact(compact_revision, cur)
{
for (const key in cur.children||{})
{
const child = cur.children[key];
this._compact(compact_revision, child);
if (emptyObj(child.children) && child.value == null && child.mod_revision < compact_revision)
{
delete cur.children[key];
}
}
}
api_create_watch(req, send)
{
const { parts, all } = this._get_range(req);
if (req.start_revision && this.compact_revision && this.compact_revision > req.start_revision)
{
// Deletions up to this.compact_revision are forgotten
return { compact_revision: this.compact_revision };
}
let watch_id = req.watch_id;
if (watch_id instanceof Object)
{
throw new RequestError(400, 'invalid watch_id');
}
if (!watch_id)
{
watch_id = ++this.watcher_id;
}
if (!this.watchers[watch_id])
{
this.watchers[watch_id] = {
paths: [],
send,
};
}
this.watchers[watch_id].paths.push(parts);
const { cur } = this._get_subtree(parts, true, false);
if (all)
{
cur.watchers = cur.watchers || [];
cur.watchers.push(watch_id);
}
else
{
cur.key_watchers = cur.key_watchers || [];
cur.key_watchers.push(watch_id);
}
if (req.start_revision && req.start_revision < this.mod_revision)
{
// Send initial changes
const imm = setImmediate(() =>
{
this.active_immediate = this.active_immediate.filter(i => i !== imm);
const events = [];
const { cur } = this._get_subtree([], false, false);
this._get_modified(events, cur, null, req.start_revision);
send({ result: { header: { revision: this.mod_revision }, events } });
});
this.active_immediate.push(imm);
}
return { watch_id, created: true };
}
_get_modified(events, cur, prefix, min_rev)
{
if (cur.mod_revision >= min_rev)
{
const ev = {
type: cur.value == null ? 'DELETE' : 'PUT',
kv: cur.value == null ? { key: this.b64(prefix === null ? '' : prefix) } : {
key: this.b64(prefix),
value: this.b64(cur.value),
mod_revision: cur.mod_revision,
},
};
if (cur.lease)
{
ev.kv.lease = cur.lease;
}
events.push(ev);
}
if (cur.children)
{
for (const k in cur.children)
{
this._get_modified(events, cur.children[k], prefix === null ? k : prefix+'/'+k, min_rev);
}
}
}
api_cancel_watch(watch_id)
{
if (this.watchers[watch_id])
{
for (const parts of this.watchers[watch_id].paths)
{
const { cur } = this._get_subtree(parts, false, false);
if (cur)
{
if (cur.watchers)
{
cur.watchers = cur.watchers.filter(id => id != watch_id);
if (!cur.watchers.length)
cur.watchers = null;
}
if (cur.key_watchers)
{
cur.key_watchers = cur.key_watchers.filter(id => id != watch_id);
if (!cur.key_watchers.length)
cur.key_watchers = null;
}
}
}
delete this.watchers[watch_id];
}
return { canceled: true };
}
_notify(notifications)
{
if (!notifications.length)
{
return;
}
const by_watcher = {};
for (const notif of notifications)
{
const watchers = notif.watchers;
delete notif.watchers;
const conv = { type: ('value' in notif) ? 'PUT' : 'DELETE', kv: notif };
for (const wid of watchers)
{
if (this.watchers[wid])
{
by_watcher[wid] = by_watcher[wid] || { header: { revision: this.mod_revision }, events: {} };
by_watcher[wid].events[notif.key] = conv;
}
}
}
for (const wid in by_watcher)
{
by_watcher[wid].events = Object.values(by_watcher[wid].events);
this.watchers[wid].send({ result: by_watcher[wid] });
}
}
async api_txn({ compare, success, failure })
{
const failed = (compare || []).filter(chk => !this._check(chk)).length > 0;
const responses = [];
const notifications = [];
const next_revision = this.mod_revision + 1;
for (const req of (failed ? failure : success) || [])
{
responses.push(this._txn_action(req, next_revision, notifications));
}
if (this.replicate && notifications.length)
{
// First replicate the change and then notify watchers about it
await this.notify_replicator(notifications);
}
this._notify(notifications);
return { header: { revision: this.mod_revision }, succeeded: !failed, responses };
}
_txn_action(req, cur_revision, notifications)
{
if (req.request_range || req.requestRange)
{
return { response_range: this._range(req.request_range || req.requestRange) };
}
else if (req.request_put || req.requestPut)
{
return { response_put: this._put(req.request_put || req.requestPut, cur_revision, notifications) };
}
else if (req.request_delete_range || req.requestDeleteRange)
{
return { response_delete_range: this._delete_range(req.request_delete_range || req.requestDeleteRange, cur_revision, notifications) };
}
return {};
}
_range(request_range)
{
// FIXME: limit, revision(-), sort_order, sort_target, serializable(-),
// count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision
const { parts, all } = this._get_range(request_range);
const { cur } = this._get_subtree(parts, false, false);
const kvs = [];
if (cur)
{
this._get_all(kvs, cur, all, parts.join('/') || null, request_range);
}
return { kvs };
}
_put(request_put, cur_revision, notifications)
{
// FIXME: prev_kv, ignore_value(?), ignore_lease(?)
const parts = this._key_parts(this.de64(request_put.key));
const key = parts.join('/');
const value = this.de64(request_put.value);
const { cur, watchers } = this._get_subtree(parts, true, true);
if (cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
if (!eq(cur.value, value) || cur.lease != request_put.lease)
{
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[key];
}
if (request_put.lease)
{
if (!this.leases[request_put.lease])
{
throw new RequestError(400, 'unknown lease: '+request_put.lease);
}
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;
}
else if (cur.lease)
{
cur.lease = null;
}
this.mod_revision = cur_revision;
cur.version = (cur.version||0) + 1;
cur.mod_revision = cur_revision;
if (cur.value == null)
{
cur.create_revision = cur_revision;
}
cur.value = value;
const notify = { watchers, key: this.b64(key), value: this.b64(value), mod_revision: cur.mod_revision };
if (cur.lease)
{
notify.lease = cur.lease;
}
notifications.push(notify);
}
return {};
}
_delete_range(request_delete_range, cur_revision, notifications)
{
// FIXME: prev_kv
const { parts, all } = this._get_range(request_delete_range);
const { cur, watchers } = this._get_subtree(parts, false, true);
const prevcount = notifications.length;
if (cur)
{
this._delete_all(notifications, watchers, cur, all, parts.join('/') || null, cur_revision);
}
return { deleted: notifications.length-prevcount };
}
_get_all(kvs, cur, all, prefix, req)
{
if (req.limit && kvs.length > req.limit)
{
return;
}
if (cur.value != null)
{
const item = { key: this.b64(prefix === null ? '' : prefix) };
if (!req.keys_only)
{
item.value = this.b64(cur.value);
item.mod_revision = cur.mod_revision;
//item.create_revision = cur.create_revision;
//item.version = cur.version;
if (cur.lease)
{
item.lease = cur.lease;
}
}
kvs.push(item);
}
if (all && cur.children)
{
for (let k in cur.children)
{
this._get_all(kvs, cur.children[k], true, prefix === null ? k : prefix+'/'+k, req);
}
}
}
_delete_all(notifications, watchers, cur, all, prefix, cur_revision)
{
if (cur.value != null)
{
// Do not actually forget the key until the deletion is confirmed by all replicas
// ...and until it's not required by watchers
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[prefix === null ? '' : prefix];
}
cur.value = null;
cur.version = 0;
cur.create_revision = null;
cur.mod_revision = cur_revision;
this.mod_revision = cur_revision;
notifications.push({
watchers: cur.key_watchers ? [ ...watchers, ...cur.key_watchers ] : watchers,
key: this.b64(prefix === null ? '' : prefix),
mod_revision: cur_revision,
});
}
if (all && cur.children)
{
for (let k in cur.children)
{
const subw = cur.children[k].watchers ? [ ...watchers, ...cur.children[k].watchers ] : watchers;
this._delete_all(notifications, subw, cur.children[k], true, prefix === null ? k : prefix+'/'+k, cur_revision);
}
}
}
}
function eq(a, b)
{
if (a instanceof Object || b instanceof Object)
{
return stableStringify(a) === stableStringify(b);
}
return a == b;
}
function emptyObj(obj)
{
if (!obj)
{
return true;
}
for (const k in obj)
{
return false;
}
return true;
}
EtcTree.eq = eq;
module.exports = EtcTree;

View File

@ -1,179 +0,0 @@
const EtcTree = require('./etctree.js');
const tests = {};
let cur_test = '';
const expect = (a, b) =>
{
if (!EtcTree.eq(a, b))
{
process.stderr.write(cur_test+' test:\nexpected: '+JSON.stringify(b)+'\nreal: '+JSON.stringify(a)+'\n'+new Error().stack.replace(/^.*\n.*\n/, '')+'\n');
process.exit(1);
}
};
tests['read/write'] = async () =>
{
const t = new EtcTree();
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: { kvs: [] } } ] }
);
expect(
await t.api_txn({
compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 1, result: 'LESS' } ],
success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ],
failure: [ { request_range: { key: '/vitastor/config/global' } } ],
}),
{ header: { revision: 1 }, succeeded: false, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
await t.api_txn({
compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 2, result: 'LESS' } ],
success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world2' } } } ]
}),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ],
} } ] }
);
expect(
t.dump(false),
{"state":{"children":{"":{"children":{"vitastor":{"children":{"config":{"children":{"global":{"version":2,"mod_revision":2,"create_revision":1,"value":{"hello":"world2"}}}}}}}}}},"mod_revision":2,"leases":{}}
);
t.destroy();
};
tests['watch'] = async () =>
{
const t = new EtcTree();
const sent = [];
const send = (event) => sent.push(event);
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
t.api_create_watch({ watch_id: 1, key: '/vitastor/', range_end: '/vitastor0' }, send),
{ watch_id: 1, created: true }
);
expect(sent, []);
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'PUT', kv: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' }, mod_revision: 2 } } ] } } ]);
t.destroy();
};
tests['lease'] = async () =>
{
const t = new EtcTree();
const sent = [];
const send = (event) => sent.push(event);
const leaseID = (await t.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
t.api_create_watch({ watch_id: 1, key: '/vitastor/', range_end: '/vitastor0' }, send),
{ watch_id: 1, created: true }
);
expect(sent, []);
const dump = t.dump(false);
const expires = dump.leases[leaseID].expires;
expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":1,"mod_revision":1,"create_revision":1,"value":{"ip":"1.2.3.4"}}}}}}}}}}}},"mod_revision":1,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
await new Promise(ok => setTimeout(ok, 600));
expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'DELETE', kv: { key: '/vitastor/osd/state/1', mod_revision: 2 } } ] } } ]);
t.pause_leases();
t.load(dump);
expect(t.dump(false), dump);
const t2 = new EtcTree();
t2.pause_leases();
t2.load(dump);
expect(t2.dump(false), dump);
t.destroy();
t2.destroy();
};
tests['update'] = async () =>
{
const t1 = new EtcTree();
const t2 = new EtcTree();
const leaseID = (await t1.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t2.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.6' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.5' } } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
let dump2 = t2.dump();
t2.load(t1.dump(), true);
t1.load(dump2, true);
let dump = t2.dump(false);
let expires = dump.leases[leaseID].expires;
expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
expect(t1.dump(false), {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
t1.destroy();
t2.destroy();
};
tests['replicate watcher'] = async () =>
{
const t = new EtcTree();
t.set_replicate_watcher(async () =>
{
throw new Error('replication failed');
});
let thrown = false;
try
{
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] });
}
catch (e)
{
thrown = e;
}
expect(thrown && thrown.message == 'replication failed', true);
t.destroy();
};
(async function()
{
for (cur_test in tests)
{
await tests[cur_test]();
console.log(cur_test+' test: OK');
}
})().catch(console.error);

View File

@ -1,140 +0,0 @@
#!/usr/bin/nodejs
// "Stupid" gossip algorithm simulation tool
function test_simple(options)
{
options.total ||= 100;
options.gossip ||= 4;
options.msgcap ||= 5;
options.update ||= 0;
options.initial ||= 5;
let messages_sent = 0;
let tick = 1;
const known = {};
const lists = {};
const listsv2 = {};
for (let i = 1; i <= options.total; i++)
{
known[i] = {};
lists[i] = [];
for (let j = 1; j <= (options.update ? options.total : options.initial); j++)
{
known[i][j] = 1; // meta version 1
lists[i].push(j);
}
listsv2[i] = [];
}
let cmp_lists;
let cmp_n;
if (options.update)
{
// We want to update <options.update> nodes metadata to version 2
for (let i = 1; i <= options.update; i++)
{
known[i][i] = 2;
listsv2[i].push(i);
}
cmp_lists = listsv2;
cmp_n = options.update;
}
else
{
// We want <options.total-options.initial> to join <options.initial>
for (let i = 1; i <= options.initial; i++)
{
if (!known[i][i])
{
known[i][i] = 1;
lists[i].push(i);
}
for (let alive = options.initial+1; alive <= options.total; alive++)
{
if (!known[i][alive])
{
known[i][alive] = true;
lists[i].push(alive);
}
}
}
cmp_lists = lists;
cmp_n = options.total;
}
let in_sync = 0;
for (let i = 1; i <= options.total; i++)
{
if (cmp_lists[i].length == cmp_n)
{
in_sync++;
}
}
let avg_known = 0;
while (in_sync < options.total)
{
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
for (let i = 1; i <= options.total; i++)
{
const known_i = lists[i];
const send_to = [];
for (let j = 0; j < options.gossip; j++)
{
send_to.push(known_i[0|(Math.random()*known_i.length)]);
}
const send_what = [];
for (let j = 0; j < options.msgcap; j++)
{
// FIXME: Exclude duplicates, exclude <send_to>
send_what.push(known_i[0|(Math.random()*known_i.length)]);
}
for (const alive of send_what)
{
for (const to of send_to)
{
if (!known[to][alive] || known[i][alive] > known[to][alive])
{
known[to][alive] = known[i][alive];
cmp_lists[to].push(alive);
if (cmp_lists[to].length == cmp_n)
{
console.log('node '+to+': tick '+tick);
in_sync++;
}
}
}
}
messages_sent += send_what.length*send_to.length;
}
avg_known = 0;
for (let i = 1; i <= options.total; i++)
{
avg_known += cmp_lists[i].length;
}
avg_known /= options.total;
tick++;
}
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
console.log(messages_sent+' messages sent');
}
const options = {};
for (let i = 2; i < process.argv.length; i++)
{
if (process.argv[i] === '-h' || process.argv[i] === '--help')
{
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+` [OPTIONS]
--gossip 4 how many nodes to gossip with every tick
--msgcap 5 how many nodes to gossip about every tick
--total 1000 total nodes
--update 0 total nodes to update if testing update. if 0 then test joining
--initial 5 initial nodes in sync to test joining (when --update is 0)`);
process.exit();
}
else if (process.argv[i].substr(0, 2) == '--')
{
options[process.argv[i].substr(2)] = 0|process.argv[i+1];
i++;
}
}
test_simple(options);

View File

@ -1,177 +0,0 @@
#!/usr/bin/nodejs
// https://github.com/hashicorp/memberlist simulation tool
class LimQ
{
constructor(retransmit, maxlen)
{
this.buckets = [];
for (let i = 0; i < retransmit; i++)
{
this.buckets.push([]);
}
this.len = 0;
this.maxlen = maxlen;
}
push(item)
{
if (this.len >= this.maxlen)
return;
const b = this.buckets[this.buckets.length-1];
b.push(item);
}
shift(n)
{
let items = [];
let move = [];
for (let i = this.buckets.length-1; i >= 0 && items.length < n; i--)
{
const rm = this.buckets[i].splice(0, n-items.length);
items.push.apply(items, rm);
if (i > 0)
for (const e of rm)
move.push([ e, i-1 ]);
else
this.len -= rm.length;
}
for (const e of move)
{
this.buckets[e[1]].push(e[0]);
}
return items;
}
}
function test_memberlist(options)
{
options.gossip ||= 4;
options.msgcap ||= 5;
options.max_ticks ||= 100000;
options.total ||= 100;
options.retransmit ||= 12;
options.update ||= 0;
options.initial ||= 5;
let tick = 0;
let messages_sent = 0;
const queue = {};
const known = {}; // { node: { other_node: meta_version } }
const lists = {};
const listsv2 = {};
for (let i = 1; i <= options.total; i++)
{
known[i] = {};
lists[i] = [];
for (let j = 1; j <= (options.update ? options.total : options.initial); j++)
{
known[i][j] = 1; // meta version 1
lists[i].push(j);
}
listsv2[i] = [];
queue[i] = new LimQ(options.retransmit, options.max_queue);
}
let cmp_lists;
let cmp_n;
if (options.update)
{
// We want to update <options.update> nodes metadata to version 2
for (let i = 1; i <= options.update; i++)
{
known[i][i] = 2;
listsv2[i].push(i);
queue[i].push(i);
}
cmp_lists = listsv2;
cmp_n = options.update;
}
else
{
// We want <options.total-options.initial> to join <options.initial>
for (let i = 1; i <= options.initial; i++)
{
for (let alive = options.initial+1; alive <= options.total; alive++)
{
known[i][alive] = 1;
lists[i].push(alive);
queue[i].push(alive);
}
}
cmp_lists = lists;
cmp_n = options.total;
}
let in_sync = 0;
for (let i = 1; i <= options.total; i++)
{
if (cmp_lists[i].length == cmp_n)
{
in_sync++;
}
}
let avg_known = 0;
while (in_sync < options.total && tick < options.max_ticks)
{
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
for (let i = 1; i <= options.total; i++)
{
const known_i = lists[i];
for (let g = 0; g < options.gossip; g++)
{
const to = known_i[0|(Math.random()*known_i.length)];
let send_what = queue[i].shift(options.msgcap);
messages_sent += send_what.length;
for (const alive of send_what)
{
if (!known[to][alive] || known[i][alive] > known[to][alive])
{
known[to][alive] = known[i][alive];
cmp_lists[to].push(alive);
queue[to].push(alive);
const cur_updated = cmp_lists[to].length;
if (cur_updated == cmp_n)
{
console.log('node '+to+': synced at tick '+tick);
in_sync++;
}
}
}
}
}
avg_known = 0;
for (let i = 1; i <= options.total; i++)
{
avg_known += cmp_lists[i].length;
}
avg_known /= options.total;
tick++;
}
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
console.log(messages_sent+' messages sent');
}
const options = {};
for (let i = 2; i < process.argv.length; i++)
{
if (process.argv[i] === '-h' || process.argv[i] === '--help')
{
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+` [OPTIONS]
--gossip 4 how many nodes to gossip with every tick
--msgcap 5 how many "alive" messages fits in a single packet (meta size/UDP packet size in memberlist)
--max_ticks 100000 execution limit
--max_queue 1024 queue size limit
--total 100 total nodes
--retransmit 12 retransmission count. by default log(total)*4 in memberlist
--update 0 total nodes to update if testing update. if 0 then test joining
--initial 5 initial nodes in sync to test joining (when --update is 0)`);
process.exit();
}
else if (process.argv[i].substr(0, 2) == '--')
{
options[process.argv[i].substr(2)] = 0|process.argv[i+1];
i++;
}
}
test_memberlist(options);

View File

@ -1,16 +1,25 @@
{
"name": "tinyraft",
"version": "1.0.0",
"version": "1.0.1",
"description": "Tiny & abstract Raft leader election algorithm",
"main": "tinyraft.js",
"scripts": {
"lint": "eslint common.js anticli.js antipersistence.js anticluster.js antietcd.js etctree.js etctree.spec.js tinyraft.js tinyraft.spec.js",
"test": "node etctree.spec.js && node tinyraft.spec.js"
"lint": "eslint tinyraft.js tinyraft.spec.js faketimer.js",
"test": "node tinyraft.spec.js"
},
"repository": {
"type": "git",
"url": "https://git.yourcmc.ru/vitalif/tinyraft"
},
"homepage": "https://git.yourcmc.ru/vitalif/tinyraft",
"bugs": {
"url": "https://git.yourcmc.ru/vitalif/tinyraft/issues"
},
"files": [
"tinyraft.js",
"tinyraft.d.ts",
"README.md"
],
"keywords": [
"raft"
],
@ -22,12 +31,5 @@
"devDependencies": {
"eslint": "^8.0.0",
"eslint-plugin-node": "^11.1.0"
},
"dependencies": {
"ws": "^8.17.0"
},
"bin": {
"antietcd": "./antietcd-app.js",
"anticli": "./anticli.js"
}
}

View File

@ -1,78 +0,0 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: MIT
function stableStringify(obj, opts)
{
if (!opts)
opts = {};
if (typeof opts === 'function')
opts = { cmp: opts };
let space = opts.space || '';
if (typeof space === 'number')
space = Array(space+1).join(' ');
const cycles = (typeof opts.cycles === 'boolean') ? opts.cycles : false;
const cmp = opts.cmp && (function (f)
{
return function (node)
{
return function (a, b)
{
let aobj = { key: a, value: node[a] };
let bobj = { key: b, value: node[b] };
return f(aobj, bobj);
};
};
})(opts.cmp);
const seen = new Map();
return (function stringify (parent, key, node, level)
{
const indent = space ? ('\n' + new Array(level + 1).join(space)) : '';
const colonSeparator = space ? ': ' : ':';
if (node === undefined)
{
return;
}
if (typeof node !== 'object' || node === null)
{
return JSON.stringify(node);
}
if (node instanceof Array)
{
const out = [];
for (let i = 0; i < node.length; i++)
{
const item = stringify(node, i, node[i], level+1) || JSON.stringify(null);
out.push(indent + space + item);
}
return '[' + out.join(',') + indent + ']';
}
else
{
if (seen.has(node))
{
if (cycles)
return JSON.stringify('__cycle__');
throw new TypeError('Converting circular structure to JSON');
}
else
seen.set(node, true);
const keys = Object.keys(node).sort(cmp && cmp(node));
const out = [];
for (let i = 0; i < keys.length; i++)
{
const key = keys[i];
const value = stringify(node, key, node[key], level+1);
if (!value)
continue;
const keyValue = JSON.stringify(key)
+ colonSeparator
+ value;
out.push(indent + space + keyValue);
}
seen.delete(node);
return '{' + out.join(',') + indent + '}';
}
})({ '': obj }, '', obj, 0);
}
module.exports = stableStringify;

View File

@ -19,6 +19,10 @@
// without being re-elected.
// If all priorities are equal (or just zero), the election algorithm
// becomes identical to the basic algorithm without priorities.
//
// (c) Vitaliy Filippov, 2024
// License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
// (https://git.yourcmc.ru/vitalif/vitastor/src/branch/master/VNPL-1.1.txt)
const EventEmitter = require('events');

View File

@ -1,44 +0,0 @@
function vitastor_persist_filter(cfg)
{
const prefix = cfg.vitastor_prefix || '/vitastor';
return (key, value) =>
{
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')
{
if (value)
{
try
{
value = JSON.parse(value);
value = JSON.stringify({
bitmap_granularity: value.bitmap_granularity || undefined,
data_block_size: value.data_block_size || undefined,
host: value.host || undefined,
immediate_commit: value.immediate_commit || undefined,
});
}
catch (e)
{
console.error('invalid JSON in '+key+' = '+value+': '+e);
value = {};
}
}
else
{
value = undefined;
}
return value;
}
else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' ||
key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' ||
key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' ||
key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' ||
key == prefix+'/stats')
{
return undefined;
}
return value;
};
}
module.exports = vitastor_persist_filter;