Compare commits

...

54 Commits

Author SHA1 Message Date
Vitaliy Filippov 5140ba96da Add raftchange event to AntiEtcd type definitions 2024-06-01 16:14:44 +03:00
Vitaliy Filippov d442c1d6d7 Add TS type definitions for TinyRaft 2024-06-01 16:14:33 +03:00
Vitaliy Filippov 993a066576 Fix message format (leaderPriority should be priority) 2024-06-01 16:14:13 +03:00
Vitaliy Filippov 244aaa414a Add client docs to readme 2024-06-01 15:08:38 +03:00
Vitaliy Filippov 8c4bbf11d5 Add TLS cert/key support to cli 2024-06-01 15:08:08 +03:00
Vitaliy Filippov 4194fd6137 Add TS type definitions for AntiEtcd 2024-06-01 14:52:35 +03:00
Vitaliy Filippov 0d5d66b8ff Run deletion compaction also in non-clustered mode 2024-06-01 14:47:48 +03:00
Vitaliy Filippov f44e4ca7d4 Add TOC 2024-06-01 14:47:48 +03:00
Vitaliy Filippov a6432216a8 Implement embeddable watch API 2024-06-01 14:47:48 +03:00
Vitaliy Filippov 2d4e83c23c Add readme for AntiEtcd 2024-06-01 14:47:48 +03:00
Vitaliy Filippov 46276e3443 Multiple polishing fixes
- Add support for HTTPS client certificates
- Make requests wait for quorum by default
- Extract vitastor_persist_filter from main code
- Do not use Promise.allSettled
- Throw 501 in etctree on unsupported features
2024-06-01 11:51:50 +03:00
Vitaliy Filippov 2071dacf93 Split antietcd server & embeddable class 2024-05-30 02:24:48 +03:00
Vitaliy Filippov 54b71347bd Add a note about leadershipTimeout 2024-05-29 10:50:15 +03:00
Vitaliy Filippov 0bbc7a2568 Add README 2024-05-26 19:38:34 +03:00
Vitaliy Filippov e95e9d1feb Fix 2 leaders during 1-3 partition test 2024-05-26 19:38:34 +03:00
Vitaliy Filippov cbb528fb30 Fix 1-3 partition test: check for 2 or 3 nodes in quorum 2024-05-26 19:38:34 +03:00
Vitaliy Filippov e12367bc82 Fix leadership timeout: confirm leadership only if quorum is met 2024-05-26 19:38:18 +03:00
Vitaliy Filippov 6b61e16877 Use fake timer for tests 2024-05-26 19:38:17 +03:00
Vitaliy Filippov 9bf5db2272 Send heartbeats to all nodes (not just followers) if leadership expiration is disabled
Otherwise, the following situation may happen:
- Node 1 is the leader and its term is 24
- Node 2 is a follower of 1, its term is 24 too
- Node 3 also thinks he's the leader, his term is 23, and he doesn't know that a new leader is elected
2024-05-17 11:31:01 +03:00
Vitaliy Filippov aba4525570 Fix missing onChange event when following a leader with greater term 2024-05-16 23:26:02 +03:00
Vitaliy Filippov 63610f0298 Fix replication failure checking 2024-05-16 20:15:24 +03:00
Vitaliy Filippov f063826a88 Wait until output is fully flushed because .write() is async in node.js 2024-05-11 12:16:36 +03:00
Vitaliy Filippov bb4935641a Persist right after loading dump 2024-05-09 18:35:30 +03:00
Vitaliy Filippov 84533dc9ef Fix notifications in etctree 2024-05-09 13:51:39 +03:00
Vitaliy Filippov 5f645c5e44 Increase mod_revision on lease operations 2024-05-09 13:51:19 +03:00
Vitaliy Filippov 0c75cd1d63 Fix persistence 2024-05-09 13:50:11 +03:00
Vitaliy Filippov 2e89aa8b17 Return correct content-type and HTTP code for errors 2024-05-09 13:49:39 +03:00
Vitaliy Filippov b4e0ebd600 Persist & replicate in parallel 2024-05-09 13:49:22 +03:00
Vitaliy Filippov 2b6cc135d3 Speedup initial sync 2024-05-09 13:48:55 +03:00
Vitaliy Filippov 5aa81a5c45 Report CLI errors as exit codes 2024-05-09 13:48:35 +03:00
Vitaliy Filippov 1734b422cb Implement deletion compaction 2024-05-08 16:54:11 +03:00
Vitaliy Filippov 325c2bb2d9 Add detailed help 2024-05-07 17:56:57 +03:00
Vitaliy Filippov 5ac46e8363 Implement client 2024-05-07 17:56:57 +03:00
Vitaliy Filippov 75faa73110 Add ESLint 2024-05-07 17:56:57 +03:00
Vitaliy Filippov ca3f156a3e Add forgotten package.json 2024-05-07 17:56:57 +03:00
Vitaliy Filippov 1a77faa510 Implement simple clustering 2024-05-07 17:56:57 +03:00
Vitaliy Filippov 3596ecd92c Implement simple persistence 2024-05-07 16:04:23 +03:00
Vitaliy Filippov e8b600f536 Implement special replication listener for etctree 2024-05-07 16:02:36 +03:00
Vitaliy Filippov 2692f4abc9 Implement update-only load 2024-05-07 16:02:36 +03:00
Vitaliy Filippov 693c49403e Implement value_filter for dump 2024-05-07 15:30:40 +03:00
Vitaliy Filippov b0cc255623 Move lease timer setting to _set_expire 2024-05-07 15:30:40 +03:00
Vitaliy Filippov 4cdbd72ca0 Remove unused on_expire_lease, rename internal methods to _+name 2024-05-07 15:30:40 +03:00
Vitaliy Filippov 74a77a3974 Split txn_action 2024-05-07 15:06:48 +03:00
Vitaliy Filippov 857cf668f2 Fix leader change event 2024-05-07 15:06:21 +03:00
Vitaliy Filippov b559f9b555 Implement simple dump/load and lease pausing 2024-05-01 01:56:38 +03:00
Vitaliy Filippov 0947d0d61a Implement leader priorities, fix changing nodes 2024-04-30 11:48:55 +03:00
Vitaliy Filippov db2cb5c5b1 Add Anti-Etcd - etcd mock, already sufficient to run Vitastor tests 2024-04-30 11:48:55 +03:00
Vitaliy Filippov ef246e1892 Add cmdline params 2024-04-19 13:32:01 +03:00
Vitaliy Filippov 88a7423453 hashicorp/memberlist & stupid gossip simulation tools 2024-04-19 13:07:27 +03:00
Vitaliy Filippov 7e11ac2477 Make node join existing quorum when seeing a VOTE message for larger term
Fixes testAdd and testRestart (with initialTerm = 1000)
2023-09-29 00:52:47 +03:00
Vitaliy Filippov b1da201d76 Add initialTerm to testAdd and make it fail; add a failing testRestart 2023-09-29 00:49:24 +03:00
Vitaliy Filippov c96a762ffc Split long if-elseif into functions 2023-09-29 00:39:29 +03:00
Vitaliy Filippov 1d8dfc861c Add a test for leadership expiration 2023-09-29 00:35:37 +03:00
Vitaliy Filippov 899c06faed I want to make mini-etcd out of it 2023-07-08 02:05:23 +03:00
21 changed files with 4408 additions and 93 deletions

49
.eslintrc.js Normal file
View File

@ -0,0 +1,49 @@
module.exports = {
"env": {
"es6": true,
"node": true
},
"extends": [
"eslint:recommended",
"plugin:node/recommended"
],
"parserOptions": {
"ecmaVersion": 2020
},
"plugins": [
],
"rules": {
"indent": [
"error",
4
],
"brace-style": [
"error",
"allman",
{ "allowSingleLine": true }
],
"linebreak-style": [
"error",
"unix"
],
"semi": [
"error",
"always"
],
"no-useless-escape": [
"off"
],
"no-control-regex": [
"off"
],
"no-empty": [
"off"
],
"no-process-exit": [
"off"
],
"node/shebang": [
"off"
]
}
};

498
ANTIETCD.md Normal file
View File

@ -0,0 +1,498 @@
# AntiEtcd
Simplistic miniature etcd replacement based on [TinyRaft](https://git.yourcmc.ru/vitalif/tinyraft/)
- Embeddable
- REST API only, gRPC is shit and will never be supported
- [TinyRaft](https://git.yourcmc.ru/vitalif/tinyraft/)-based leader election
- Websocket-based cluster communication
- Supports a limited subset of etcd REST APIs
- With optional persistence
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or [VNPL-1.1](https://git.yourcmc.ru/vitalif/vitastor/src/branch/master/VNPL-1.1.txt)
# Contents
- [CLI Usage](#cli-usage)
- [CLI Client](#cli-client)
- [Options](#options)
- [HTTP](#http)
- [Persistence](#persistence)
- [Clustering](#clustering)
- [Embedded Usage](#embedded-usage)
- [About Persistence](#about-persistence)
- [Supported etcd APIs](#supported-etcd-apis)
- [/v3/kv/txn](#v3-kv-txn)
- [/v3/kv/put](#v3-kv-put)
- [/v3/kv/range](#v3-kv-range)
- [/v3/kv/deleterange](#v3-kv-deleterange)
- [/v3/lease/grant](#v3-lease-grant)
- [/v3/lease/keepalive](#v3-lease-keepalive)
- [/v3/lease/revoke or /v3/kv/lease/revoke](#v3-lease-revoke-or-v3-kv-lease-revoke)
- [Websocket-based watch APIs](#websocket-based-watch-apis)
- [HTTP Error Codes](#http-error-codes)
## CLI Usage
```
npm install antietcd
node_modules/.bin/antietcd \
[--cert ssl.crt] [--key ssl.key] [--port 12379] \
[--data data.gz] [--persist_interval 500] \
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381]
[other options]
```
Antietcd doesn't background itself, so use systemd or start-stop-daemon to run it as a background service.
### CLI Client
```
node_modules/.bin/anticli [OPTIONS] put <key> [<value>]
node_modules/.bin/anticli [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]
node_modules/.bin/anticli [OPTIONS] del <key> [-p|--prefix]
```
For `put`, if `<value>` is not specified, it will be read from STDIN.
Options:
<dl>
<dt>--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379</dt>
<dd>Specify HTTP endpoints to connect to</dd>
<dt>--cert &lt;cert&gt;</dt>
<dd>Use TLS with this certificate file (PEM format)</dd>
<dt>--key &lt;key&gt;</dt>
<dd>Use TLS with this key file (PEM format)</dd>
<dt>--timeout 1000</dt>
<dd>Specify request timeout in milliseconds</dd>
</dl>
## Options
### HTTP
<dl>
<dt>--port 2379</dt>
<dd>Listen port</dd>
<dt>--cert &lt;cert&gt;</dt>
<dd>Use TLS with this certificate file (PEM format)</dd>
<dt>--key &lt;key&gt;</dt>
<dd>Use TLS with this key file (PEM format)</dd>
<dt>--ca &lt;ca&gt;</dt>
<dd>Use trusted root certificates from this file.
Specify &lt;ca&gt; = &lt;cert&gt; if your certificate is self-signed.</dd>
<dt>--client_cert_auth 1</dt>
<dd>Require TLS client certificates signed by <ca> or by default CA to connect.</dd>
<dt>--ws_keepalive_interval 30000</dt>
<dd>Client websocket ping (keepalive) interval in milliseconds</dd>
</dl>
### Persistence
<dl>
<dt>--data &lt;filename&gt;</dt>
<dd>Store persistent data in &lt;filename&gt;</dd>
<dt>--persist_interval &lt;milliseconds&gt;</dt>
<dd>Persist data on disk after this interval, not immediately after change</dd>
<dt>--persist_filter ./filter.js</dt>
<dd>Use persistence filter from ./filter.js (or a module). <br />
Persistence filter is a function(cfg) returning function(key, value) ran
for every change and returning a new value or undefined to skip persistence.</dd>
<dt>--compact_revisions 1000</dt>
<dd>Number of previous revisions to keep deletion information in memory</dd>
</dl>
### Clustering
<dl>
<dt>--node_id &lt;id></dt>
<dd>ID of this cluster node</dd>
<dt>--cluster &lt;id1&gt;=&lt;url1&gt;,&lt;id2&gt;=&lt;url2&gt;,...</dt>
<dd>All other cluster nodes</dd>
<dt>--cluster_key &lt;key&gt;</dt>
<dd>Shared cluster key for identification</dd>
<dt>--election_timeout 5000</dt>
<dd>Raft election timeout</dd>
<dt>--heartbeat_timeout 1000</dt>
<dd>Raft leader heartbeat timeout</dd>
<dt>--wait_quorum_timeout 30000</dt>
<dd>Timeout for requests to wait for quorum to come up</dd>
<dt>--leader_priority &lt;number&gt;</dt>
<dd>Raft leader priority for this node (optional)</dd>
<dt>--stale_read 1</dt>
<dd>Allow to serve reads from followers. Specify 0 to disallow</dd>
<dt>--reconnect_interval 1000</dt>
<dd>Unavailable peer connection retry interval</dd>
<dt>--dump_timeout 5000</dt>
<dd>Timeout for dump command in milliseconds</dd>
<dt>--load_timeout 5000</dt>
<dd>Timeout for load command in milliseconds</dd>
<dt>--forward_timeout 1000</dt>
<dd>Timeout for forwarding requests from follower to leader in milliseconds</dd>
<dt>--replication_timeout 1000</dt>
<dd>Timeout for replicating requests from leader to follower in milliseconds</dd>
<dt>--compact_timeout 1000</dt>
<dd>Timeout for compaction requests from leader to follower in milliseconds</dd>
</dl>
## Embedded Usage
```js
const AntiEtcd = require('antietcd');
// Configuration may contain all the same options like in CLI, without "--"
// Except that persist_filter should be a callback (key, value) => newValue
const srv = new AntiEtcd({ ...configuration });
// Start server
srv.start();
// Make a local API call in generic style:
let res = await srv.api('kv_txn'|'kv_range'|'kv_put'|'kv_deleterange'|'lease_grant'|'lease_revoke'|'lease_keepalive', { ...params });
// Or function-style:
res = await srv.txn(params);
res = await srv.range(params);
res = await srv.put(params);
res = await srv.deleterange(params);
res = await srv.lease_grant(params);
res = await srv.lease_revoke(params);
res = await srv.lease_keepalive(params);
// Error handling:
try
{
res = await srv.txn(params);
}
catch (e)
{
if (e instanceof AntiEtcd.RequestError)
{
// e.code is HTTP code
// e.message is error message
}
}
// Watch API:
const watch_id = await srv.create_watch(params, (message) => console.log(message));
await srv.cancel_watch(watch_id);
// Stop server
srv.stop();
```
## About Persistence
Persistence is very simple: full database is dumped into JSON, gzipped and saved as file.
By default, it is written and fsynced on disk on every change, but it can be configured
to dump DB on disk at fixed intervals, for example, at most every 500 ms - of course,
at expense of slightly reduced crash resiliency (example: `--persist_interval 500`).
You can also specify a filter to exclude some data from persistence by using the option
`--persist_filter ./filter.js`. Persistence filter code example:
```js
function example_filter(cfg)
{
// <cfg> contains all command-line options
const prefix = cfg.exclude_keys;
if (!prefix)
{
return null;
}
return (key, value) =>
{
if (key.substr(0, prefix.length) == prefix)
{
// Skip all keys with prefix from persistence
return undefined;
}
if (key === '/statistics')
{
// Return <unneeded_key> from inside value
const decoded = JSON.parse(value);
return JSON.stringify({ ...decoded, unneeded_key: undefined });
}
return value;
};
}
module.exports = example_filter;
```
## Supported etcd APIs
NOTE: `key`, `value` and `range_end` are always encoded in base64, like in original etcd.
Range requests are only supported across "directories" separated by `/`.
It means that in range requests `key` must always end with `/` and `range_end` must always
end with `0`, and that such request will return a whole subtree of keys.
### /v3/kv/txn
Request:
```ts
type TxnRequest = {
compare?: (
{ key: string, target: "MOD", mod_revision: number, result?: "LESS" }
| { key: string, target: "CREATE", create_revision: number, result?: "LESS" }
| { key: string, target: "VERSION", version: number, result?: "LESS" }
| { key: string, target: "LEASE", lease: string, result?: "LESS" }
| { key: string, target: "VALUE", value: string }
)[],
success?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
failure?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
serializable?: boolean,
}
```
`serializable` allows to serve read-only requests from follower even if `stale_read` is not enabled.
Response:
```ts
type TxnResponse = {
header: { revision: number },
succeeded: boolean,
responses: (
{ response_put: PutResponse }
| { response_range: RangeResponse }
| { response_delete_range: DeleteRangeResponse }
)[],
}
```
### /v3/kv/put
Request:
```ts
type PutRequest = {
key: string,
value: string,
lease?: string,
}
```
Other parameters are not supported: prev_kv, ignore_value, ignore_lease.
Response:
```ts
type PutResponse = {
header: { revision: number },
}
```
### /v3/kv/range
Request:
```ts
type RangeRequest = {
key: string,
range_end?: string,
keys_only?: boolean,
serializable?: boolean,
}
```
`serializable` allows to serve read-only requests from follower even if `stale_read` is not enabled.
Other parameters are not supported: revision, limit, sort_order, sort_target,
count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision.
Response:
```ts
type RangeResponse = {
header: { revision: number },
kvs: { key: string }[] | {
key: string,
value: string,
lease?: string,
mod_revision: number,
}[],
}
```
### /v3/kv/deleterange
Request:
```ts
type DeleteRangeRequest = {
key: string,
range_end?: string,
}
```
Other parameters are not supported: prev_kv.
Response:
```ts
type DeleteRangeResponse = {
header: { revision: number },
// number of deleted keys
deleted: number,
}
```
### /v3/lease/grant
Request:
```ts
type LeaseGrantRequest = {
ID?: string,
TTL: number,
}
```
Response:
```ts
type LeaseGrantResponse = {
header: { revision: number },
ID: string,
TTL: number,
}
```
### /v3/lease/keepalive
Request:
```ts
type LeaseKeepaliveRequest = {
ID: string,
}
```
Response:
```ts
type LeaseKeepaliveResponse = {
result: {
header: { revision: number },
ID: string,
TTL: number,
}
}
```
### /v3/lease/revoke or /v3/kv/lease/revoke
Request:
```ts
type LeaseRevokeRequest = {
ID: string,
}
```
Response:
```ts
type LeaseRevokeResponse = {
header: { revision: number },
}
```
### Websocket-based watch APIs
Client-to-server message format:
```ts
type ClientMessage =
{ create_request: {
key: string,
range_end?: string,
start_revision?: number,
watch_id?: string,
} }
| { cancel_request: {
watch_id: string,
} }
| { progress_request: {} }
```
Server-to-client message format:
```ts
type ServerMessage = {
result: {
header: { revision: number },
watch_id: string,
created?: boolean,
canceled?: boolean,
compact_revision?: number,
events?: {
type: 'PUT'|'DELETE',
kv: {
key: string,
value: string,
lease?: string,
mod_revision: number,
},
}[],
}
} | { error: 'bad-json' } | { error: 'empty-message' }
```
### HTTP Error Codes
- 400 for invalid requests
- 404 for unsupported API / URL not found
- 405 for non-POST request method
- 501 for unsupported API feature - non-directory range queries and so on
- 502 for server is stopping
- 503 for quorum-related errors - quorum not available and so on

103
README.md Normal file
View File

@ -0,0 +1,103 @@
# TinyRaft
Raft leader election isolated from the rest of the algorithm.
TinyRaft doesn't know anything about replication and doesn't require you to
implement it.
Actual network communication is also abstracted away and hidden behind a simple
callback interface.
The only task of TinyRaft is to elect the leader and guarantee that there is
only one leader at each moment.
TinyRaft can be used:
- As a simple leader election algorithm without replication at all
- As a building block for the standard Raft algorithm if you add log replication
- For other variations of "weaker consensus" if you add another method of replication
Some replication ideas for you:
- Log-less replication: Add a version number for each key in the database
and make the leader synchronize follower databases by simply dumping all
followers' databases with the newest term (followers with older terms should
be ignored), comparing version numbers and making the newest version of each
key win.
- Erasure coding: Suppose you store large values. You can split each value into
N parts, add K parity parts to it using Reed-Solomon codes (ISA-L/jerasure)
and store them on different nodes in the form of Raft-like logs or similar
to the log-less replication with version numbers, and make master synchronize
followers by reconstructing every original value.
## Example Application
TODO: Extract and describe Antietcd.
## Usage
```js
const node = new TinyRaft({
nodes: [ 1, 2, 3 ],
nodeId: 1,
electionTimeout: 5000,
heartbeatTimeout: 1000,
leadershipTimeout: 10000,
initialTerm: 0,
leaderPriority: 0,
send: function(to, msg)
{
// Function to send message <msg> to node with ID <to>
// msg.type is one of TinyRaft.VOTE_REQUEST, TinyRaft.VOTE, TinyRaft.PING, TinyRaft.PONG
// msg.leader is the leader ID or null
// msg.term is the term number
// msg.priority is the optional leadership priority if set in config
},
});
// Watch for election state
node.on('change', (st) =>
{
console.log(
'node '+node.nodeId+': '+(st.state == TinyRaft.FOLLOWER ? 'following '+st.leader : st.state)+
', term '+st.term+(st.state == TinyRaft.LEADER ? ', followers: '+st.followers.join(', ') : '')
);
});
// Start Raft node or start a new election round
node.start();
// Optional; may be called for a follower when it receives a message from a live leader,
// for example, a replication message, and causes the follower to move its round expiration forward
node.markAlive();
// Update cluster node list
node.setNodes([ 1, 2, 3 ]);
// Incoming messages should be fed to TinyRaft like this (from = ID of the sender):
node.onReceive(from, msg);
// Stop Raft node
node.stop();
```
## Additional features
### Leader expiration
Supports leader expiration like in NuRaft:
https://github.com/eBay/NuRaft/blob/master/docs/leadership_expiration.md
When leader expiration is enabled, followers respond to leader heartbeats
(pings) with "pong" messages and if the leader doesn't receive a quorum of
replies in leadershipTimeout - it starts a new round of voting.
### Leadership priorities
Also supports leader priorities, similar to NuRaft but even simpler:
if a node receives a VoteRequest message with larger term but with smaller
priority than its own, it immediately starts a new voting round.
It guarantees that a node with non-maximum priority can't become leader
without being re-elected.
If all priorities are equal (or just zero), the election algorithm
becomes identical to the basic algorithm without priorities.

259
anticli.js Normal file
View File

@ -0,0 +1,259 @@
#!/usr/bin/env node
const fsp = require('fs').promises;
const http = require('http');
const https = require('https');
const help_text = `CLI for AntiEtcd
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
Usage:
anticli.js [OPTIONS] put <key> [<value>]
anticli.js [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]
anticli.js [OPTIONS] del <key> [-p|--prefix]
Options:
[--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379]
[--cert cert.pem] [--key key.pem] [--timeout 1000]
`;
class AntiEtcdCli
{
static parse(args)
{
const cmd = [];
const options = {};
for (let i = 2; i < args.length; i++)
{
const arg = args[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
if (arg === '-h' || arg === '--help')
{
process.stderr.write(help_text);
process.exit();
}
else if (arg == '-e' || arg == '--endpoints')
{
options['endpoints'] = args[++i].split(/\s*[,\s]+\s*/);
}
else if (arg == '-p' || arg == '--prefix')
{
options['prefix'] = true;
}
else if (arg == '-v' || arg == '--print_value_only')
{
options['print_value_only'] = true;
}
else if (arg == '-k' || arg == '--keys_only')
{
options['keys_only'] = true;
}
else if (arg[0] == '-' && arg[1] !== '-')
{
process.stderr.write('Unknown option '+arg);
process.exit(1);
}
else if (arg.substr(0, 2) == '--')
{
options[arg.substr(2)] = args[++i];
}
else
{
cmd.push(arg);
}
}
if (!cmd.length || cmd[0] != 'get' && cmd[0] != 'put' && cmd[0] != 'del')
{
process.stderr.write('Supported commands: get, put, del. Use --help to see details\n');
process.exit(1);
}
return [ cmd, options ];
}
async run(cmd, options)
{
this.options = options;
if (!this.options.endpoints)
{
this.options.endpoints = [ 'http://localhost:2379' ];
}
if (this.options.cert && this.options.key)
{
this.tls = {
key: await fsp.readFile(this.options.key),
cert: await fsp.readFile(this.options.cert),
};
}
if (cmd[0] == 'get')
{
await this.get(cmd.slice(1));
}
else if (cmd[0] == 'put')
{
await this.put(cmd[1], cmd.length > 2 ? cmd[2] : undefined);
}
else if (cmd[0] == 'del')
{
await this.del(cmd.slice(1));
}
// wait until output is fully flushed
await new Promise(ok => process.stdout.write('', ok));
await new Promise(ok => process.stderr.write('', ok));
process.exit(0);
}
async get(keys)
{
if (this.options.prefix)
{
keys = keys.map(k => k.replace(/\/+$/, ''));
}
const txn = { success: keys.map(key => ({ request_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) };
const res = await this.request('/v3/kv/txn', txn);
for (const r of res.responses||[])
{
if (r.response_range)
{
for (const kv of r.response_range.kvs)
{
if (!this.options.print_value_only)
{
process.stdout.write(de64(kv.key)+'\n');
}
if (!this.options.keys_only)
{
process.stdout.write(de64(kv.value)+'\n');
}
}
}
}
}
async put(key, value)
{
if (value === undefined)
{
value = await fsp.readFile(0, { encoding: 'utf-8' });
}
const res = await this.request('/v3/kv/put', { key: b64(key), value: b64(value) });
if (res.header)
{
process.stdout.write('OK\n');
}
}
async del(keys)
{
if (this.options.prefix)
{
keys = keys.map(k => k.replace(/\/+$/, ''));
}
const txn = { success: keys.map(key => ({ request_delete_range: this.options.prefix ? { key: b64(key+'/'), range_end: b64(key+'0') } : { key: b64(key) } })) };
const res = await this.request('/v3/kv/txn', txn);
for (const r of res.responses||[])
{
if (r.response_delete_range)
{
process.stdout.write(r.response_delete_range.deleted+'\n');
}
}
}
async request(path, body)
{
for (const url of this.options.endpoints)
{
const cur_url = url.replace(/\/+$/, '')+path;
const res = await POST(cur_url, this.tls||{}, body, this.options.timeout||1000);
if (res.json)
{
if (res.json.error)
{
process.stderr.write(cur_url+': '+res.json.error);
process.exit(1);
}
return res.json;
}
if (res.body)
{
process.stderr.write(cur_url+': '+res.body);
}
if (res.error)
{
process.stderr.write(cur_url+': '+res.error);
if (!res.response || !res.response.statusCode)
{
// This URL is unavailable
continue;
}
}
break;
}
process.exit(1);
}
}
function POST(url, options, body, timeout)
{
return new Promise(ok =>
{
const body_text = Buffer.from(JSON.stringify(body));
let timer_id = timeout > 0 ? setTimeout(() =>
{
if (req)
req.abort();
req = null;
ok({ error: 'timeout' });
}, timeout) : null;
let req = (url.substr(0, 6).toLowerCase() == 'https://' ? https : http).request(url, { method: 'POST', headers: {
'Content-Type': 'application/json',
'Content-Length': body_text.length,
}, timeout, ...options }, (res) =>
{
if (!req)
{
return;
}
clearTimeout(timer_id);
let res_body = '';
res.setEncoding('utf8');
res.on('error', (error) => ok({ error }));
res.on('data', chunk => { res_body += chunk; });
res.on('end', () =>
{
if (res.statusCode != 200 || !/application\/json/i.exec(res.headers['content-type']))
{
ok({ response: res, body: res_body, code: res.statusCode });
return;
}
try
{
res_body = JSON.parse(res_body);
ok({ response: res, json: res_body });
}
catch (e)
{
ok({ response: res, error: e, body: res_body });
}
});
});
req.on('error', (error) => ok({ error }));
req.on('close', () => ok({ error: new Error('Connection closed prematurely') }));
req.write(body_text);
req.end();
});
}
function b64(str)
{
return Buffer.from(str).toString('base64');
}
function de64(str)
{
return Buffer.from(str, 'base64').toString();
}
new AntiEtcdCli().run(...AntiEtcdCli.parse(process.argv)).catch(console.error);

522
anticluster.js Normal file
View File

@ -0,0 +1,522 @@
const ws = require('ws');
const TinyRaft = require('./tinyraft.js');
const { runCallbacks, RequestError } = require('./common.js');
const LEADER_MISMATCH = 'raft leader/term mismatch';
const LEADER_ONLY = 1;
const NO_WAIT_QUORUM = 2;
const READ_FROM_FOLLOWER = 4;
class AntiCluster
{
constructor(antietcd)
{
this.antietcd = antietcd;
this.cfg = antietcd.cfg;
this.cluster_connections = {};
this.last_request_id = 1;
this.subrequests = {};
this.synced = false;
this.wait_sync = [];
if (!this.cfg.node_id || !this.cfg.cluster_key)
{
throw new Error('node_id and cluster_key are required in configuration if cluster is set');
}
if (!(this.cfg.cluster instanceof Object))
{
this.cfg.cluster = (''+this.cfg.cluster).trim().split(/[\s,]*,[\s,]*/)
.reduce((a, c) => { c = c.split(/\s*=\s*/); a[c[0]] = c[1]; return a; }, {});
}
this.raft = new TinyRaft({
nodes: Object.keys(this.cfg.cluster),
nodeId: this.cfg.node_id,
heartbeatTimeout: this.cfg.heartbeat_timeout,
electionTimeout: this.cfg.election_timeout,
leaderPriority: this.cfg.leader_priority||undefined,
initialTerm: this.antietcd.stored_term,
send: (to, msg) => this._sendRaftMessage(to, msg),
});
this.raft.on('change', (event) => this._handleRaftChange(event));
this.raft.start();
// Connect to all nodes and reconnect forever
for (const node_id in this.cfg.cluster)
{
this.connectToNode(node_id);
}
}
connectToNode(node_id)
{
if (node_id != this.cfg.node_id && this.cfg.cluster[node_id] &&
(!this.cluster_connections[node_id] || !this.antietcd.clients[this.cluster_connections[node_id]]))
{
const socket = new ws.WebSocket(this.cfg.cluster[node_id].replace(/^http/, 'ws'), this.antietcd.tls);
const client_id = this.antietcd._startWebsocket(socket, () => setTimeout(() => this.connectToNode(node_id), this.cfg.reconnect_interval||1000));
this.cluster_connections[node_id] = client_id;
socket.on('open', () =>
{
if (this.antietcd.clients[client_id])
{
this.antietcd.clients[client_id].ready = true;
this.antietcd.clients[client_id].raft_node_id = node_id;
this.antietcd.clients[client_id].addr = socket._socket.remoteAddress+':'+socket._socket.remotePort;
socket.send(JSON.stringify({ identify: { key: this.cfg.cluster_key, node_id: this.cfg.node_id } }));
this.raft.start();
}
});
}
}
_peerRequest(client, request, timeout)
{
const request_id = this.last_request_id++;
request.request_id = request_id;
client.socket.send(JSON.stringify(request));
const req = this.subrequests[request_id] = { client_id: client.id };
const promise = new Promise(ok => req.cb = ok);
req.timer_id = setTimeout(() => this._completeRequest(null, request_id, { error: 'timeout' }), timeout);
return promise;
}
async replicateChange(msg)
{
if (this.raft.state !== TinyRaft.LEADER)
{
return;
}
const mod_revision = this.antietcd.etctree.mod_revision;
await this._requestFollowers({ replicate: msg }, this.cfg.replication_timeout||1000);
// We have a guarantee that all revisions before mod_revision are applied by followers,
// because replication messages are either processed synchronously or serialized in
// AntiPersistence against <wait_persist>
this.sync_revision = mod_revision;
if (this.sync_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{
const revision = this.sync_revision - (this.cfg.compact_revisions||1000);
await this._requestFollowers({ compact: { revision } }, this.cfg.compact_timeout||1000);
this.antietcd.etctree.compact(revision);
}
}
_log(msg)
{
if (this.cfg.log_level > 0)
{
console.log(msg);
}
}
async _requestFollowers(msg, timeout)
{
msg.term = this.raft.term;
const followers = this.raft.followers;
for (const follower of followers)
{
if (follower != this.cfg.node_id)
{
const client = this._getPeer(follower);
if (!client)
{
// One of peers is unavailable - immediate failure, request should be retried
this._log('Lost peer connection during replication - restarting election');
this.raft.start();
throw new RequestError(503, 'Peer connection is lost, please retry request');
}
}
}
const promises = [];
for (const follower of followers)
{
if (follower != this.cfg.node_id)
{
const client = this._getPeer(follower);
const promise = this._peerRequest(client, msg, timeout);
promises.push(promise);
}
}
const results = await Promise.all(promises);
let i = 0;
for (const follower of followers)
{
if (follower != this.cfg.node_id)
{
const result = results[i];
if (!result || result.error)
{
// One of peers is unavailable - immediate failure, request should be retried
this._log('Replication failed ('+follower+': '+(result ? result.error : 'no result')+') - restarting election');
this.raft.start();
throw new RequestError(503, 'Replication failed, please retry request');
}
i++;
}
}
}
_completeRequest(client_id, request_id, result)
{
const req = this.subrequests[request_id];
if (!req || client_id && req.client_id != client_id)
{
return;
}
delete this.subrequests[request_id];
if (req.timer_id)
{
clearTimeout(req.timer_id);
req.timer_id = null;
}
req.cb(result);
}
_handleRaftChange(event)
{
this.antietcd.emit('raftchange', event);
this._log(
'Raft '+this.cfg.node_id+': '+(event.state == TinyRaft.FOLLOWER ? 'following '+event.leader : event.state)+
', term '+event.term+(event.state == TinyRaft.LEADER ? ', followers: '+event.followers.join(', ') : '')
);
if (event.state == TinyRaft.LEADER)
{
// (Re)sync with the new set of followers
this._resync(event.followers);
this.antietcd.etctree.resume_leases();
}
else
{
this.synced = false;
this.antietcd.etctree.pause_leases();
}
}
_resync(followers)
{
this.synced = false;
if (!this.resync_state)
{
this.resync_state = {
dumps: {},
loads: {},
};
}
const seen = {};
for (const f of followers)
{
seen[f] = true;
if (f != this.cfg.node_id && !(f in this.resync_state.dumps))
{
const client = this._getPeer(f);
if (client)
{
this.resync_state.dumps[f] = null;
this._peerRequest(client, { request: {}, handler: 'dump' }, this.cfg.dump_timeout||5000).then(res =>
{
if (this.resync_state && client.raft_node_id &&
(client.raft_node_id in this.resync_state.dumps))
{
if (res.error)
{
console.error(client.raft_node_id+' dump failed with error: '+res.error);
}
else
{
this._log('Got dump from '+client.raft_node_id+' with stored term '+res.term);
}
this.resync_state.dumps[client.raft_node_id] = res.error ? null : res;
this._continueResync();
}
});
}
}
}
for (const f in this.resync_state.dumps)
{
if (!seen[f])
{
delete this.resync_state.dumps[f];
}
}
this._continueResync();
}
_continueResync()
{
if (Object.values(this.resync_state.dumps).filter(d => !d).length > 0)
{
// Some dump(s) are still pending
return;
}
this.resync_state.dumps[this.cfg.node_id] = { ...this.antietcd.etctree.dump(), term: this.antietcd.stored_term };
let max_term = -1, with_max = [];
for (const follower in this.resync_state.dumps)
{
const dump = this.resync_state.dumps[follower];
if (dump.term > max_term)
{
max_term = dump.term;
with_max = [ follower ];
}
else if (dump.term == max_term)
{
with_max.push(follower);
}
}
if (max_term < 0 || with_max.length == 0)
{
throw new Error('BUG: no max term during resync');
}
this._log('Local term '+this.antietcd.stored_term+', max follower term '+max_term+' at nodes '+with_max.join(', '));
with_max = with_max.filter(w => w != this.cfg.node_id);
// Merge databases of all nodes with maximum term
// Force other nodes to replicate the merged DB, throwing away their own states
for (let i = 0; i < with_max.length; i++)
{
const update_only = !(i == 0 && this.antietcd.stored_term != max_term);
this._log(update_only ? 'Updating database from node '+with_max[i]+' state' : 'Copying node '+with_max[i]+' state');
this.antietcd.etctree.load(this.resync_state.dumps[with_max[i]], update_only);
}
let wait = 0;
const load_request = { term: this.raft.term, load: this.antietcd.etctree.dump() };
for (const follower in this.resync_state.dumps)
{
if (follower != this.cfg.node_id)
{
const dump = this.resync_state.dumps[follower];
if (dump.term <= max_term)
{
const client = this._getPeer(follower);
if (!client)
{
this._log('Lost peer connection during resync - restarting election');
this.raft.start();
return;
}
this._log('Copying state to '+follower);
const loadstate = this.resync_state.loads[follower] = {};
wait++;
this._peerRequest(client, load_request, this.cfg.load_timeout||5000).then(res =>
{
loadstate.result = res;
this._finishResync();
});
}
}
}
if (!wait)
{
this._finishResync();
}
}
_finishResync()
{
if (Object.values(this.resync_state.dumps).filter(d => !d).length > 0 ||
Object.values(this.resync_state.loads).filter(d => !d.result).length > 0)
{
return;
}
// All current peers have copied the database, we can proceed
this.antietcd.stored_term = this.raft.term;
this.synced = true;
runCallbacks(this, 'wait_sync', []);
this._log('Synchronized with followers, new term is '+this.raft.term);
}
_isWrite(path, data)
{
if (path == 'kv_txn')
{
return ((!data.compare || !data.compare.length) &&
(!data.success || !data.success.filter(f => f.request_put || f.requestPut || f.request_delete_range || f.requestDeleteRange).length) &&
(!data.failure || !data.failure.filter(f => f.request_put || f.requestPut || f.request_delete_range || f.requestDeleteRange).length));
}
return path != 'kv_range';
}
async checkRaftState(path, leaderonly, data)
{
if (!this.raft)
{
return null;
}
if (leaderonly == LEADER_ONLY && this.raft.state != TinyRaft.LEADER)
{
throw new RequestError(503, 'Not leader');
}
if (leaderonly == NO_WAIT_QUORUM && this.raft.state == TinyRaft.CANDIDATE)
{
throw new RequestError(503, 'Quorum not available');
}
if (!this.synced)
{
// Wait for quorum / initial sync with timeout
await new Promise((ok, no) =>
{
this.wait_sync.push(ok);
setTimeout(() =>
{
this.wait_sync = this.wait_sync.filter(cb => cb != ok);
no(new RequestError(503, 'Quorum not available'));
}, this.cfg.wait_quorum_timeout||30000);
});
}
if (this.raft.state == TinyRaft.FOLLOWER &&
(this._isWrite(path, data) || !this.cfg.stale_read && !(leaderonly & READ_FROM_FOLLOWER)))
{
// Forward to leader
return await this._forwardToLeader(path, data);
}
return null;
}
async _forwardToLeader(handler, data)
{
const client = this._getPeer(this.raft.leader);
if (!client)
{
throw new RequestError(503, 'Leader is unavailable');
}
return await this._peerRequest(client, { handler, request: data }, this.cfg.forward_timeout||1000);
}
handleWsMsg(client, msg)
{
if (msg.raft)
{
if (client.raft_node_id)
{
this.raft.onReceive(client.raft_node_id, msg.raft);
}
}
else if (msg.identify)
{
if (msg.identify.key === this.cfg.cluster_key &&
msg.identify.node_id != this.cfg.node_id)
{
client.raft_node_id = msg.identify.node_id;
this._log('Got a connection from '+client.raft_node_id);
}
}
else if (msg.load)
{
this._handleLoadMsg(client, msg).catch(console.error);
}
else if (msg.replicate)
{
this._handleReplicateMsg(client, msg).catch(console.error);
}
else if (msg.request)
{
this._handleRequestMsg(client, msg).catch(console.error);
}
else if (msg.reply)
{
this._completeRequest(client.id, msg.request_id, msg.reply);
}
else if (msg.compact)
{
this._handleCompactMsg(client, msg);
}
}
async _handleRequestMsg(client, msg)
{
try
{
const res = await this.antietcd.api(msg.handler, msg.request);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: res }));
}
catch (e)
{
console.error(e);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: e.message } }));
}
}
async _handleLoadMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
this.antietcd.etctree.load(msg.load);
if (this.antietcd.persistence)
{
await this.antietcd.persistence.persist();
}
this.antietcd.stored_term = msg.term;
this.synced = true;
runCallbacks(this, 'wait_sync', []);
this._log('Synchronized with leader, new term is '+msg.term);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
async _handleReplicateMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
await this.antietcd.etctree.apply_replication(msg.replicate);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
_handleCompactMsg(client, msg)
{
if (client.raft_node_id && this.raft.state == TinyRaft.FOLLOWER &&
this.raft.leader === client.raft_node_id && this.raft.term == msg.term)
{
this.antietcd.etctree.compact(msg.compact.revision);
this._log('Compacted deletions up to '+msg.compact.revision);
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: {} }));
}
else
{
client.socket.send(JSON.stringify({ request_id: msg.request_id, reply: { error: LEADER_MISMATCH } }));
}
}
_getPeer(to)
{
if (to == this.cfg.node_id)
{
throw new Error('BUG: attempt to get connection to self');
}
const client_id = this.cluster_connections[to];
if (!client_id)
{
return null;
}
const client = this.antietcd.clients[client_id];
if (!client || !client.ready)
{
return null;
}
return client;
}
_sendRaftMessage(to, msg)
{
const client = this._getPeer(to);
if (client)
{
client.socket.send(JSON.stringify({ raft: msg }));
}
}
}
AntiCluster.LEADER_ONLY = LEADER_ONLY;
AntiCluster.NO_WAIT_QUORUM = NO_WAIT_QUORUM;
AntiCluster.READ_FROM_FOLLOWER = READ_FROM_FOLLOWER;
module.exports = AntiCluster;

118
antietcd-app.js Normal file
View File

@ -0,0 +1,118 @@
#!/usr/bin/env node
const AntiEtcd = require('./antietcd.js');
const help_text = `Miniature etcd replacement based on TinyRaft
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
Usage:
${process.argv[0]} ${process.argv[1]} \
[--cert ssl.crt] [--key ssl.key] [--port 12379] \
[--data data.gz] [--persist-filter ./filter.js] [--persist_interval 500] \
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381] \
[other options]
Supported etcd REST APIs:
/v3/kv/txn /v3/kv/put /v3/kv/range /v3/kv/deleterange
/v3/lease/grant /v3/lease/keepalive /v3/lease/revoke /v3/kv/lease/revoke
websocket-based watch API (create_request, cancel_request, progress_request)
Options:
HTTP:
--port 2379
Listen port
--cert <cert>
Use TLS with this certificate file (PEM format)
--key <key>
Use TLS with this key file (PEM format)
--ca <ca>
Use trusted root certificates from this file.
Specify <ca> = <cert> if your certificate is self-signed.
--client_cert_auth 1
Require TLS client certificates signed by <ca> or by default CA to connect.
--ws_keepalive_interval 30000
Client websocket ping (keepalive) interval in milliseconds
Persistence:
--data <filename>
Store persistent data in <filename>
--persist_interval <milliseconds>
Persist data on disk after this interval, not immediately after change
--persist_filter ./filter.js
Use persistence filter from ./filter.js (or a module).
Persistence filter is a function(cfg) returning function(key, value) ran
for every change and returning a new value or undefined to skip persistence.
--compact_revisions 1000
Number of previous revisions to keep deletion information in memory
Clustering:
--node_id <id>
ID of this cluster node
--cluster <id1>=<url1>,<id2>=<url2>,...
All other cluster nodes
--cluster_key <key>
Shared cluster key for identification
--election_timeout 5000
Raft election timeout
--heartbeat_timeout 1000
Raft leader heartbeat timeout
--wait_quorum_timeout 30000
Timeout for requests to wait for quorum to come up
--leader_priority <number>
Raft leader priority for this node (optional)
--stale_read 1
Allow to serve reads from followers. Specify 0 to disallow
--reconnect_interval 1000
Unavailable peer connection retry interval
--dump_timeout 5000
Timeout for dump command in milliseconds
--load_timeout 5000
Timeout for load command in milliseconds
--forward_timeout 1000
Timeout for forwarding requests from follower to leader in milliseconds
--replication_timeout 1000
Timeout for replicating requests from leader to follower in milliseconds
--compact_timeout 1000
Timeout for compaction requests from leader to follower in milliseconds
`;
function parse()
{
const options = { stale_read: 1 };
for (let i = 2; i < process.argv.length; i++)
{
const arg = process.argv[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
if (arg === '-h' || arg === '--help')
{
process.stderr.write(help_text);
process.exit();
}
else if (arg.substr(0, 2) == '--')
{
options[arg.substr(2)] = process.argv[++i];
}
}
options['stale_read'] = options['stale_read'] === '1' || options['stale_read'] === 'yes' || options['stale_read'] === 'true';
if (options['persist_filter'])
{
options['persist_filter'] = require(options['persist_filter'])(options);
}
return options;
}
const antietcd = new AntiEtcd(parse());
// Set exit hook
const on_stop_cb = async () => { await antietcd.stop(); process.exit(0); };
process.on('SIGINT', on_stop_cb);
process.on('SIGTERM', on_stop_cb);
process.on('SIGQUIT', on_stop_cb);
antietcd.start().catch(console.error);

155
antietcd.d.ts vendored Normal file
View File

@ -0,0 +1,155 @@
import type { EventEmitter } from 'events';
import type { TinyRaftEvents } from './tinyraft';
export type AntiEtcdEvents = {
raftchange: TinyRaftEvents['change'],
};
export class AntiEtcd extends EventEmitter<AntiEtcdEvents>
{
constructor(cfg: object);
start(): Promise<void>;
stop(): Promise<void>;
api(path: 'kv_txn'|'kv_range'|'kv_put'|'kv_deleterange'|'lease_grant'|'lease_revoke'|'lease_keepalive', params: object): Promise<object>;
txn(params: TxnRequest): Promise<TxnResponse>;
range(params: RangeRequest): Promise<RangeResponse>;
put(params: PutRequest): Promise<PutResponse>;
deleterange(params: DeleteRangeRequest): Promise<DeleteRangeResponse>;
lease_grant(params: LeaseGrantRequest): Promise<LeaseGrantResponse>;
lease_revoke(params: LeaseRevokeRequest): Promise<LeaseRevokeResponse>;
lease_keepalive(params: LeaseKeepaliveRequest): Promise<LeaseKeepaliveResponse>;
create_watch(params: WatchCreateRequest, callback: (ServerMessage) => void): Promise<string|number>;
cancel_watch(watch_id: string|number): Promise<void>;
}
export type TxnRequest = {
compare?: (
{ key: string, target: "MOD", mod_revision: number, result?: "LESS" }
| { key: string, target: "CREATE", create_revision: number, result?: "LESS" }
| { key: string, target: "VERSION", version: number, result?: "LESS" }
| { key: string, target: "LEASE", lease: string, result?: "LESS" }
| { key: string, target: "VALUE", value: string }
)[],
success?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
failure?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
serializable?: boolean,
};
export type TxnResponse = {
header: { revision: number },
succeeded: boolean,
responses: (
{ response_put: PutResponse }
| { response_range: RangeResponse }
| { response_delete_range: DeleteRangeResponse }
)[],
};
export type PutRequest = {
key: string,
value: string,
lease?: string,
};
export type PutResponse = {
header: { revision: number },
};
export type RangeRequest = {
key: string,
range_end?: string,
keys_only?: boolean,
serializable?: boolean,
};
export type RangeResponse = {
header: { revision: number },
kvs: { key: string }[] | {
key: string,
value: string,
lease?: string,
mod_revision: number,
}[],
};
export type DeleteRangeRequest = {
key: string,
range_end?: string,
};
export type DeleteRangeResponse = {
header: { revision: number },
// number of deleted keys
deleted: number,
};
export type LeaseGrantRequest = {
ID?: string,
TTL: number,
};
export type LeaseGrantResponse = {
header: { revision: number },
ID: string,
TTL: number,
};
export type LeaseKeepaliveRequest = {
ID: string,
};
export type LeaseKeepaliveResponse = {
result: {
header: { revision: number },
ID: string,
TTL: number,
}
};
export type LeaseRevokeRequest = {
ID: string,
};
export type LeaseRevokeResponse = {
header: { revision: number },
};
export type WatchCreateRequest = {
key: string,
range_end?: string,
start_revision?: number,
watch_id?: string|number,
}
export type ClientMessage =
{ create_request: WatchCreateRequest }
| { cancel_request: { watch_id: string } }
| { progress_request: {} };
export type ServerMessage = {
result: {
header: { revision: number },
watch_id: string|number,
created?: boolean,
canceled?: boolean,
compact_revision?: number,
events?: {
type: 'PUT'|'DELETE',
kv: {
key: string,
value: string,
lease?: string,
mod_revision: number,
},
}[],
}
} | { error: 'bad-json' } | { error: 'empty-message' };

548
antietcd.js Normal file
View File

@ -0,0 +1,548 @@
#!/usr/bin/node
const fsp = require('fs').promises;
const { URL } = require('url');
const http = require('http');
const https = require('https');
const EventEmitter = require('events');
const ws = require('ws');
const EtcTree = require('./etctree.js');
const AntiPersistence = require('./antipersistence.js');
const AntiCluster = require('./anticluster.js');
const { runCallbacks, RequestError } = require('./common.js');
class AntiEtcd extends EventEmitter
{
constructor(cfg)
{
super();
this.clients = {};
this.client_id = 1;
this.etctree = new EtcTree(true);
this.persistence = null;
this.cluster = null;
this.stored_term = 0;
this.cfg = cfg;
this.loading = false;
this.stopped = false;
this.inflight = 0;
this.wait_inflight = [];
this.api_watches = {};
}
async start()
{
if (this.cfg.data || this.cfg.cluster)
{
this.etctree.set_replicate_watcher(msg => this._persistAndReplicate(msg));
}
if (this.cfg.data)
{
this.persistence = new AntiPersistence(this);
// Load data from disk
await this.persistence.load();
}
if (this.cfg.cluster)
{
this.cluster = new AntiCluster(this);
}
if (this.cfg.cert)
{
this.tls = {
key: await fsp.readFile(this.cfg.key),
cert: await fsp.readFile(this.cfg.cert),
};
if (this.cfg.ca)
{
this.tls.ca = await fsp.readFile(this.cfg.ca);
}
if (this.cfg.client_cert_auth)
{
this.tls.requestCert = true;
}
this.server = https.createServer(this.tls, (req, res) => this._handleRequest(req, res));
}
else
{
this.server = http.createServer((req, res) => this._handleRequest(req, res));
}
this.wss = new ws.WebSocketServer({ server: this.server });
// eslint-disable-next-line no-unused-vars
this.wss.on('connection', (conn, req) => this._startWebsocket(conn, null));
this.server.listen(this.cfg.port || 2379);
}
async stop()
{
if (this.stopped)
{
return;
}
this.stopped = true;
// Wait until all requests complete
while (this.inflight > 0)
{
await new Promise(ok => this.wait_inflight.push(ok));
}
if (this.persistence)
{
await this.persistence.persist();
}
}
async _persistAndReplicate(msg)
{
let res = [];
if (this.cluster)
{
// We have to guarantee that replication is processed sequentially
// So we have to send messages without first awaiting for anything!
res.push(this.cluster.replicateChange(msg));
}
if (this.persistence)
{
res.push(this.persistence.persistChange(msg));
}
if (res.length == 1)
{
await res[0];
}
else if (res.length > 0)
{
let done = 0;
await new Promise((allOk, allNo) =>
{
res.map(promise => promise.then(res =>
{
if ((++done) == res.length)
allOk();
}).catch(e =>
{
console.error(e);
allNo(e);
}));
});
}
if (!this.cluster)
{
// Run deletion compaction without followers
const mod_revision = this.antietcd.etctree.mod_revision;
if (mod_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{
const revision = mod_revision - (this.cfg.compact_revisions||1000);
this.antietcd.etctree.compact(revision);
}
}
}
_handleRequest(req, res)
{
let data = [];
req.on('data', (chunk) => data.push(chunk));
req.on('end', async () =>
{
this.inflight++;
data = Buffer.concat(data);
let body = '';
let code = 200;
let ctype = 'text/plain; charset=utf-8';
let reply;
try
{
if (req.headers['content-type'] != 'application/json')
{
throw new RequestError(400, 'content-type should be application/json');
}
body = data.toString();
try
{
data = data.length ? JSON.parse(data) : {};
}
catch (e)
{
throw new RequestError(400, 'body should be valid JSON');
}
if (!(data instanceof Object) || data instanceof Array)
{
throw new RequestError(400, 'body should be JSON object');
}
reply = await this._runHandler(req, data);
reply = JSON.stringify(reply);
ctype = 'application/json';
}
catch (e)
{
if (e instanceof RequestError)
{
code = e.code;
reply = e.message;
}
else
{
console.error(e);
code = 500;
reply = 'Internal error: '+e.message;
}
}
try
{
// Access log
if (this.cfg.log_level > 1)
{
console.log(
new Date().toISOString()+
' '+(req.headers['x-forwarded-for'] || (req.socket.remoteAddress + ':' + req.socket.remotePort))+
' '+req.method+' '+req.url+' '+code+'\n '+body.replace(/\n/g, '\\n')+
'\n '+reply.replace(/\n/g, '\\n')
);
}
reply = Buffer.from(reply);
res.writeHead(code, {
'Content-Type': ctype,
'Content-Length': reply.length,
});
res.write(reply);
res.end();
}
catch (e)
{
console.error(e);
}
this.inflight--;
if (!this.inflight)
{
runCallbacks(this, 'wait_inflight', []);
}
});
}
_startWebsocket(socket, reconnect)
{
const client_id = this.client_id++;
this.clients[client_id] = {
id: client_id,
addr: socket._socket ? socket._socket.remoteAddress+':'+socket._socket.remotePort : '',
socket,
alive: true,
watches: {},
};
socket.on('pong', () => this.clients[client_id].alive = true);
socket.on('error', e => console.error(e.syscall === 'connect' ? e.message : e));
const pinger = setInterval(() =>
{
if (!this.clients[client_id])
{
return;
}
if (!this.clients[client_id].alive)
{
return socket.terminate();
}
this.clients[client_id].alive = false;
socket.ping(() => {});
}, this.cfg.ws_keepalive_interval||30000);
socket.on('message', (msg) =>
{
try
{
msg = JSON.parse(msg);
}
catch (e)
{
socket.send(JSON.stringify({ error: 'bad-json' }));
return;
}
if (!msg)
{
socket.send(JSON.stringify({ error: 'empty-message' }));
}
else
{
this._handleMessage(client_id, msg, socket);
}
});
socket.on('close', () =>
{
this._unsubscribeClient(client_id);
clearInterval(pinger);
delete this.clients[client_id];
socket.terminate();
if (reconnect)
{
reconnect();
}
});
return client_id;
}
async _runHandler(req, data)
{
// v3/kv/txn
// v3/kv/range
// v3/kv/put
// v3/kv/deleterange
// v3/lease/grant
// v3/lease/keepalive
// v3/lease/revoke O_o
// v3/kv/lease/revoke O_o
const requestUrl = new URL(req.url, 'http://'+(req.headers.host || 'localhost'));
if (requestUrl.searchParams.get('leaderonly'))
{
data.leaderonly = true;
}
if (requestUrl.searchParams.get('serializable'))
{
data.serializable = true;
}
if (requestUrl.searchParams.get('nowaitquorum'))
{
data.nowaitquorum = true;
}
try
{
if (requestUrl.pathname.substr(0, 4) == '/v3/')
{
const path = requestUrl.pathname.substr(4).replace(/\/+$/, '').replace(/\/+/g, '_');
if (req.method != 'POST')
{
throw new RequestError(405, 'Please use POST method');
}
return await this.api(path, data);
}
else if (requestUrl.pathname == '/dump')
{
return await this.api('dump', data);
}
else
{
throw new RequestError(404, '');
}
}
catch (e)
{
if ((e instanceof RequestError) && e.code == 404)
{
throw new RequestError(404, 'Supported APIs: /v3/kv/txn, /v3/kv/range, /v3/kv/put, /v3/kv/deleterange, '+
'/v3/lease/grant, /v3/lease/revoke, /v3/kv/lease/revoke, /v3/lease/keepalive');
}
else
{
throw e;
}
}
}
// public generic handler
async api(path, data)
{
if (this.stopped)
{
throw new RequestError(502, 'Server is stopping');
}
if (path !== 'dump' && this.cluster)
{
const res = await this.cluster.checkRaftState(
path,
(data.leaderonly ? AntiCluster.LEADER_ONLY : 0) |
(data.serializable ? AntiCluster.READ_FROM_FOLLOWER : 0) |
(data.nowaitquorum ? AntiCluster.NO_WAIT_QUORUM : 0),
data
);
if (res)
{
return res;
}
}
const cb = this['_handle_'+path];
if (cb)
{
const res = cb.call(this, data);
if (res instanceof Promise)
{
return await res;
}
return res;
}
throw new RequestError(404, 'Unsupported API');
}
// public wrappers
async txn(params)
{
return await this.api('kv_txn', params);
}
async range(params)
{
return await this.api('kv_range', params);
}
async put(params)
{
return await this.api('kv_put', params);
}
async deleterange(params)
{
return await this.api('kv_deleterange', params);
}
async lease_grant(params)
{
return await this.api('lease_grant', params);
}
async lease_revoke(params)
{
return await this.api('lease_revoke', params);
}
async lease_keepalive(params)
{
return await this.api('lease_keepalive', params);
}
// public watch API
async create_watch(params, callback)
{
const watch = this.etctree.api_create_watch({ ...params, watch_id: null }, callback);
if (!watch.created)
{
throw new RequestError(400, 'Requested watch revision is compacted', { compact_revision: watch.compact_revision });
}
const watch_id = params.watch_id || watch.watch_id;
this.api_watches[watch_id] = watch.watch_id;
return watch_id;
}
async cancel_watch(watch_id)
{
const mapped_id = this.api_watches[watch_id];
if (!mapped_id)
{
throw new RequestError(400, 'Watch not found');
}
this.etctree.api_cancel_watch({ watch_id: mapped_id });
delete this.api_watches[watch_id];
}
// internal handlers
async _handle_kv_txn(data)
{
return await this.etctree.api_txn(data);
}
async _handle_kv_range(data)
{
const r = await this.etctree.api_txn({ success: [ { request_range: data } ] });
return { header: r.header, ...r.responses[0].response_range };
}
async _handle_kv_put(data)
{
const r = await this.etctree.api_txn({ success: [ { request_put: data } ] });
return { header: r.header, ...r.responses[0].response_put };
}
async _handle_kv_deleterange(data)
{
const r = await this.etctree.api_txn({ success: [ { request_delete_range: data } ] });
return { header: r.header, ...r.responses[0].response_delete_range };
}
_handle_lease_grant(data)
{
return this.etctree.api_grant_lease(data);
}
_handle_lease_revoke(data)
{
return this.etctree.api_revoke_lease(data);
}
_handle_kv_lease_revoke(data)
{
return this.etctree.api_revoke_lease(data);
}
_handle_lease_keepalive(data)
{
return this.etctree.api_keepalive_lease(data);
}
// eslint-disable-next-line no-unused-vars
_handle_dump(data)
{
return { ...this.etctree.dump(), term: this.stored_term };
}
_handleMessage(client_id, msg, socket)
{
const client = this.clients[client_id];
if (this.cfg.access_log)
{
console.log(new Date().toISOString()+' '+client.addr+' '+(client.raft_node_id || '-')+' -> '+JSON.stringify(msg));
}
if (msg.create_request)
{
const create_request = msg.create_request;
if (!create_request.watch_id || !client.watches[create_request.watch_id])
{
const watch = this.etctree.api_create_watch(
{ ...create_request, watch_id: null }, (msg) => socket.send(JSON.stringify(msg))
);
if (!watch.created)
{
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, ...watch } }));
}
else
{
create_request.watch_id = create_request.watch_id || watch.watch_id;
client.watches[create_request.watch_id] = watch.watch_id;
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: create_request.watch_id, created: true } }));
}
}
}
else if (msg.cancel_request)
{
const mapped_id = client.watches[msg.cancel_request.watch_id];
if (mapped_id)
{
this.etctree.api_cancel_watch({ watch_id: mapped_id });
delete client.watches[msg.cancel_request.watch_id];
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision }, watch_id: msg.cancel_request.watch_id, canceled: true } }));
}
}
else if (msg.progress_request)
{
socket.send(JSON.stringify({ result: { header: { revision: this.etctree.mod_revision } } }));
}
else
{
if (!this.cluster)
{
return;
}
this.cluster.handleWsMsg(client, msg);
}
}
_unsubscribeClient(client_id)
{
if (!this.clients[client_id])
{
return;
}
for (const watch_id in this.clients[client_id].watches)
{
const mapped_id = this.clients[client_id].watches[watch_id];
this.etctree.api_cancel_watch({ watch_id: mapped_id });
}
}
}
AntiEtcd.RequestError = RequestError;
module.exports = AntiEtcd;

134
antipersistence.js Normal file
View File

@ -0,0 +1,134 @@
const fs = require('fs');
const fsp = require('fs').promises;
const zlib = require('zlib');
const stableStringify = require('./stable-stringify.js');
const EtcTree = require('./etctree.js');
const { de64, runCallbacks } = require('./common.js');
class AntiPersistence
{
constructor(antietcd)
{
this.cfg = antietcd.cfg;
this.antietcd = antietcd;
this.prev_value = {};
this.persist_timer = null;
this.wait_persist = null;
}
async load()
{
// eslint-disable-next-line no-unused-vars
const [ err, stat ] = await new Promise(ok => fs.stat(this.cfg.data, (err, stat) => ok([ err, stat ])));
if (!err)
{
let data = await fsp.readFile(this.cfg.data);
data = await new Promise((ok, no) => zlib.gunzip(data, (err, res) => err ? no(err) : ok(res)));
data = JSON.parse(data);
this.loading = true;
this.antietcd.etctree.load(data);
this.loading = false;
this.antietcd.stored_term = data['term'] || 0;
}
else if (err.code != 'ENOENT')
{
throw err;
}
}
async persistChange(msg)
{
if (this.loading)
{
return;
}
if (!msg.events || !msg.events.length)
{
// lease-only changes don't need to be persisted
return;
}
if (this.cfg.persist_filter)
{
let changed = false;
for (const ev of msg.events)
{
if (ev.lease)
{
// Values with lease are never persisted
const key = de64(ev.key);
if (this.prev_value[key] !== undefined)
{
delete this.prev_value[key];
changed = true;
}
}
else
{
const key = de64(ev.key);
const filtered = this.cfg.persist_filter(key, ev.value == null ? undefined : de64(ev.value));
if (!EtcTree.eq(filtered, this.prev_value[key]))
{
this.prev_value[key] = filtered;
changed = true;
}
}
}
if (!changed)
{
return;
}
}
await this.schedulePersist();
}
async schedulePersist()
{
if (!this.cfg.persist_interval)
{
await this.persist();
return;
}
if (!this.persist_timer)
{
this.persist_timer = setTimeout(() =>
{
this.persist_timer = null;
this.persist().catch(console.error);
}, this.cfg.persist_interval);
}
}
async persist()
{
if (!this.cfg.data)
{
return;
}
while (this.wait_persist)
{
await new Promise(ok => this.wait_persist.push(ok));
}
this.wait_persist = [];
try
{
let dump = this.antietcd.etctree.dump(true);
dump['term'] = this.antietcd.stored_term;
dump = stableStringify(dump);
dump = await new Promise((ok, no) => zlib.gzip(dump, (err, res) => err ? no(err) : ok(res)));
const fh = await fsp.open(this.cfg.data+'.tmp', 'w');
await fh.writeFile(dump);
await fh.sync();
await fh.close();
await fsp.rename(this.cfg.data+'.tmp', this.cfg.data);
}
catch (e)
{
console.error('Error persisting data to disk: '+e);
process.exit(1);
}
runCallbacks(this, 'wait_persist', null);
}
}
module.exports = AntiPersistence;

35
common.js Normal file
View File

@ -0,0 +1,35 @@
class RequestError
{
constructor(code, text, details)
{
this.code = code;
this.message = text;
this.details = details;
}
}
function de64(k)
{
if (k == null) // null or undefined
return k;
return Buffer.from(k, 'base64').toString();
}
function runCallbacks(obj, key, new_value)
{
const cbs = obj[key];
obj[key] = new_value;
if (cbs)
{
for (const cb of cbs)
{
cb();
}
}
}
module.exports = {
RequestError,
de64,
runCallbacks,
};

871
etctree.js Normal file
View File

@ -0,0 +1,871 @@
const crypto = require('crypto');
const stableStringify = require('./stable-stringify.js');
const { RequestError } = require('./common.js');
/*type TreeNode = {
value?: any,
create_revision?: number,
mod_revision?: number,
version?: number,
lease?: string,
children: { [string]: TreeNode },
watchers?: number[],
key_watchers?: number[],
};*/
class EtcTree
{
constructor(use_base64)
{
this.state = {};
this.leases = {};
this.watchers = {};
this.watcher_id = 0;
this.mod_revision = 0;
this.compact_revision = 0;
this.use_base64 = use_base64;
this.replicate = null;
this.paused = false;
this.active_immediate = [];
}
destroy()
{
this.pause_leases();
for (const imm of this.active_immediate)
{
clearImmediate(imm);
}
}
set_replicate_watcher(replicate)
{
// Replication watcher is special:
// It should be an async function and it is called BEFORE notifying all
// other watchers about any change.
// It may also throw to prevent notifying at all if replication fails.
this.replicate = replicate;
}
de64(k)
{
if (k == null) // null or undefined
return k;
return this.use_base64 ? Buffer.from(k, 'base64').toString() : k;
}
b64(k)
{
if (k == null) // null or undefined
return k;
return this.use_base64 ? Buffer.from(k).toString('base64') : k;
}
_check(chk)
{
const parts = this._key_parts(this.de64(chk.key));
const { cur } = this._get_subtree(parts, false, false);
let check_value, ref_value;
if (chk.target === 'MOD')
{
check_value = cur && cur.mod_revision || 0;
ref_value = chk.mod_revision || 0;
}
else if (chk.target === 'CREATE')
{
check_value = cur && cur.create_revision || 0;
ref_value = chk.create_revision || 0;
}
else if (chk.target === 'VERSION')
{
check_value = cur && cur.version || 0;
ref_value = chk.version || 0;
}
else if (chk.target === 'LEASE')
{
check_value = cur && cur.lease;
ref_value = chk.lease;
}
else if (chk.target === 'VALUE')
{
check_value = cur && cur.value;
ref_value = chk.value;
}
else
{
throw new RequestError(501, 'Unsupported comparison target: '+chk.target);
}
if (chk.result === 'LESS')
{
return check_value < ref_value;
}
else if (chk.result)
{
throw new RequestError(501, 'Unsupported comparison result: '+chk.result);
}
return check_value == ref_value;
}
_key_parts(key)
{
const parts = key.replace(/\/\/+/g, '/').replace(/\/$/g, ''); // trim beginning?
return parts === '' ? [] : parts.split('/');
}
_get_range(req)
{
const key = this.de64(req.key);
const end = this.de64(req.range_end);
if (end != null && (key[key.length-1] != '/' || end[end.length-1] != '0' ||
end.substr(0, end.length-1) !== key.substr(0, key.length-1)))
{
throw new RequestError(501, 'Non-directory range queries are unsupported');
}
const parts = this._key_parts(key);
return { parts, all: end != null };
}
_get_subtree(parts, create, notify)
{
let cur = this.state;
let watchers = notify ? [] : null;
for (let k of parts)
{
if (notify && cur.watchers)
{
watchers.push.apply(watchers, cur.watchers);
}
if (!cur.children)
{
if (!create)
{
return {};
}
cur.children = {};
}
if (!cur.children[k])
{
if (!create)
{
return {};
}
cur.children[k] = {};
}
cur = cur.children[k];
}
if (notify && cur.watchers)
{
watchers.push.apply(watchers, cur.watchers);
}
return { watchers, cur };
}
// create a snapshot of all data including leases
dump(persistent_only, value_filter)
{
const snapshot = {
state: this._copy_tree(this.state, persistent_only, value_filter) || {},
mod_revision: this.mod_revision,
compact_revision: this.compact_revision,
};
if (!persistent_only)
{
snapshot.leases = {};
for (const id in this.leases)
{
const lease = this.leases[id];
snapshot.leases[id] = { ttl: lease.ttl, expires: lease.expires };
}
}
return snapshot;
}
_copy_tree(cur, no_lease, value_filter)
{
let nonempty = cur.value != null && (!no_lease || !cur.lease);
let filtered;
if (nonempty && value_filter)
{
filtered = value_filter(cur.value);
nonempty = nonempty && filtered != null;
}
const copy = (nonempty ? { ...cur } : {});
copy.children = {};
if (nonempty && value_filter)
{
copy.value = filtered;
}
delete copy.watchers;
delete copy.key_watchers;
let has_children = false;
for (const k in cur.children)
{
const child = this._copy_tree(cur.children[k], no_lease, value_filter);
if (child)
{
copy.children[k] = child;
has_children = true;
}
}
if (!nonempty && !has_children)
{
return null;
}
if (!has_children)
{
delete copy.children;
}
return copy;
}
// load snapshot of all data including leases
load(snapshot, update_only)
{
if (!update_only || this.mod_revision < snapshot.mod_revision)
{
this.mod_revision = snapshot.mod_revision;
}
if (!update_only || this.compact_revision > (snapshot.compact_revision||0))
{
this.compact_revision = snapshot.compact_revision||0;
}
// First apply leases
const notifications = [];
if (!update_only && snapshot.leases)
{
for (const id in this.leases)
{
if (!snapshot.leases[id])
{
// Revoke without replicating and notifying
this._sync_revoke_lease(id, notifications, this.mod_revision);
}
}
}
for (const id in snapshot.leases||{})
{
this.load_lease({ id, ...snapshot.leases[id] });
}
// Then find and apply the difference in data
this._restore_diff(update_only, this.state, snapshot.state, null, this.state.watchers || [], notifications);
this._notify(notifications);
}
_restore_diff(update_only, cur_old, cur_new, prefix, watchers, notifications)
{
if (!update_only || !cur_old.mod_revision || cur_old.mod_revision < cur_new.mod_revision)
{
const key = prefix === null ? '' : prefix;
if (!eq(cur_old.lease, cur_new.lease))
{
if (cur_old.lease && this.leases[cur_old.lease])
{
delete this.leases[cur_old.lease].keys[key];
}
cur_old.lease = cur_new.lease;
if (cur_new.lease && this.leases[cur_new.lease])
{
this.leases[cur_new.lease].keys[key] = true;
}
}
cur_old.mod_revision = cur_new.mod_revision;
cur_old.create_revision = cur_new.create_revision;
cur_old.version = cur_new.version;
if (!eq(cur_old.value, cur_new.value))
{
cur_old.value = cur_new.value;
const key_watchers = (cur_old.key_watchers ? [ ...watchers, ...(cur_old.key_watchers||[]) ] : watchers);
const notify = { watchers: key_watchers, key, value: cur_new.value, mod_revision: cur_new.mod_revision };
if (cur_new.lease)
{
notify.lease = cur_new.lease;
}
notifications.push(notify);
}
}
cur_old.children = cur_old.children || {};
for (const k in cur_new.children)
{
if (!cur_old.children[k])
{
cur_old.children[k] = cur_new.children[k];
}
else
{
this._restore_diff(
update_only, cur_old.children[k], cur_new.children[k],
prefix === null ? k : prefix+'/'+k,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
notifications
);
}
}
if (!update_only)
{
for (const k in cur_old.children)
{
if (!cur_new.children || !cur_new.children[k])
{
// Delete subtree
this._delete_all(
notifications,
cur_old.children[k].watchers ? [ ...watchers, ...cur_old.children[k].watchers ] : watchers,
cur_old.children[k], true,
prefix === null ? k : prefix+'/'+k,
this.mod_revision
);
}
}
}
}
// slave/follower nodes don't expire leases themselves, they listen for the leader instead
pause_leases()
{
if (this.paused)
{
return;
}
this.paused = true;
for (const id in this.leases)
{
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
}
}
resume_leases()
{
if (!this.paused)
{
return;
}
this.paused = false;
for (const id in this.leases)
{
this._set_expire(id);
}
}
_set_expire(id)
{
if (!this.paused)
{
const lease = this.leases[id];
if (!lease.timer_id)
{
lease.timer_id = setTimeout(() => this.api_revoke_lease({ ID: id }).catch(console.error), lease.expires - Date.now());
}
}
}
async api_grant_lease(req)
{
let id;
while (!id || this.leases[id])
{
id = crypto.randomBytes(8).toString('hex');
}
const expires = Date.now() + req.TTL*1000;
this.leases[id] = { ttl: req.TTL, expires, timer_id: null, keys: {} };
this.mod_revision++;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl: req.TTL, expires } ] });
}
return { header: { revision: this.mod_revision }, ID: id, TTL: req.TTL };
}
async api_keepalive_lease(req)
{
const id = req.ID;
if (!this.leases[id])
{
throw new RequestError(400, 'unknown lease');
}
const lease = this.leases[id];
if (lease.timer_id)
{
clearTimeout(lease.timer_id);
lease.timer_id = null;
}
const ttl = this.leases[id].ttl;
lease.expires = Date.now() + ttl*1000;
this.mod_revision++;
this._set_expire(id);
if (this.replicate)
{
await this.replicate({ header: { revision: this.mod_revision }, leases: [ { id, ttl, expires: lease.expires } ] });
}
// extra wrapping in { result: ... }
return { result: { header: { revision: this.mod_revision }, ID: id, TTL: ''+ttl } };
}
load_lease(lease)
{
const id = lease.id;
if (!this.leases[id])
{
this.leases[id] = { ...lease, timer_id: null, keys: {} };
}
else if (this.leases[id].ttl != lease.ttl ||
this.leases[id].expires != lease.expires)
{
this.leases[id].ttl = lease.ttl;
this.leases[id].expires = lease.expires;
}
else
{
return false;
}
if (this.leases[id].timer_id)
{
clearTimeout(this.leases[id].timer_id);
this.leases[id].timer_id = null;
}
this._set_expire(id);
return true;
}
_sync_revoke_lease(id, notifications, next_revision)
{
if (!this.leases[id])
{
throw new RequestError(400, 'unknown lease');
}
for (const key in this.leases[id].keys)
{
this._delete_range({ key }, next_revision, notifications);
}
delete this.leases[id];
}
async api_revoke_lease(req, no_throw)
{
const notifications = [];
if (!this.leases[req.ID])
{
if (no_throw)
return null;
throw new RequestError(400, 'unknown lease');
}
this.mod_revision++;
this._sync_revoke_lease(req.ID, notifications, this.mod_revision);
if (this.replicate)
{
await this.notify_replicator(notifications, [ { id: req.ID } ]);
}
this._notify(notifications);
return { header: { revision: this.mod_revision } };
}
async notify_replicator(notifications, leases)
{
// First replicate the change and then notify watchers about it
const all_changes = {};
for (const chg of notifications)
{
all_changes[chg.key] = { ...chg };
delete all_changes[chg.key].watchers;
}
await this.replicate({ header: { revision: this.mod_revision }, events: Object.values(all_changes), leases });
}
async apply_replication(msg)
{
this.mod_revision = msg.header.revision;
const notifications = [];
if ((msg.leases||[]).length)
{
for (const lease of msg.leases)
{
if (lease.ttl)
{
this.load_lease(lease);
}
else
{
this._sync_revoke_lease(lease.id, notifications, this.mod_revision);
}
}
}
if ((msg.events||[]).length)
{
for (const ev of msg.events)
{
if (ev.value == null)
{
this._delete_range({ key: ev.key }, ev.mod_revision, notifications);
}
else
{
this._put({ key: ev.key, value: ev.value, lease: ev.lease }, ev.mod_revision, notifications);
}
}
}
if (this.replicate)
{
await this.notify_replicator(notifications, msg.leases);
}
this._notify(notifications);
}
// forget deletions before compact_revision
compact(compact_revision)
{
this._compact(compact_revision, this.state);
this.compact_revision = compact_revision;
}
_compact(compact_revision, cur)
{
for (const key in cur.children||{})
{
const child = cur.children[key];
this._compact(compact_revision, child);
if (emptyObj(child.children) && child.value == null && child.mod_revision < compact_revision)
{
delete cur.children[key];
}
}
}
api_create_watch(req, send)
{
const { parts, all } = this._get_range(req);
if (req.start_revision && this.compact_revision && this.compact_revision > req.start_revision)
{
// Deletions up to this.compact_revision are forgotten
return { compact_revision: this.compact_revision };
}
let watch_id = req.watch_id;
if (watch_id instanceof Object)
{
throw new RequestError(400, 'invalid watch_id');
}
if (!watch_id)
{
watch_id = ++this.watcher_id;
}
if (!this.watchers[watch_id])
{
this.watchers[watch_id] = {
paths: [],
send,
};
}
this.watchers[watch_id].paths.push(parts);
const { cur } = this._get_subtree(parts, true, false);
if (all)
{
cur.watchers = cur.watchers || [];
cur.watchers.push(watch_id);
}
else
{
cur.key_watchers = cur.key_watchers || [];
cur.key_watchers.push(watch_id);
}
if (req.start_revision && req.start_revision < this.mod_revision)
{
// Send initial changes
const imm = setImmediate(() =>
{
this.active_immediate = this.active_immediate.filter(i => i !== imm);
const events = [];
const { cur } = this._get_subtree([], false, false);
this._get_modified(events, cur, null, req.start_revision);
send({ result: { header: { revision: this.mod_revision }, events } });
});
this.active_immediate.push(imm);
}
return { watch_id, created: true };
}
_get_modified(events, cur, prefix, min_rev)
{
if (cur.mod_revision >= min_rev)
{
const ev = {
type: cur.value == null ? 'DELETE' : 'PUT',
kv: cur.value == null ? { key: this.b64(prefix === null ? '' : prefix) } : {
key: this.b64(prefix),
value: this.b64(cur.value),
mod_revision: cur.mod_revision,
},
};
if (cur.lease)
{
ev.kv.lease = cur.lease;
}
events.push(ev);
}
if (cur.children)
{
for (const k in cur.children)
{
this._get_modified(events, cur.children[k], prefix === null ? k : prefix+'/'+k, min_rev);
}
}
}
api_cancel_watch(watch_id)
{
if (this.watchers[watch_id])
{
for (const parts of this.watchers[watch_id].paths)
{
const { cur } = this._get_subtree(parts, false, false);
if (cur)
{
if (cur.watchers)
{
cur.watchers = cur.watchers.filter(id => id != watch_id);
if (!cur.watchers.length)
cur.watchers = null;
}
if (cur.key_watchers)
{
cur.key_watchers = cur.key_watchers.filter(id => id != watch_id);
if (!cur.key_watchers.length)
cur.key_watchers = null;
}
}
}
delete this.watchers[watch_id];
}
return { canceled: true };
}
_notify(notifications)
{
if (!notifications.length)
{
return;
}
const by_watcher = {};
for (const notif of notifications)
{
const watchers = notif.watchers;
delete notif.watchers;
const conv = { type: ('value' in notif) ? 'PUT' : 'DELETE', kv: notif };
for (const wid of watchers)
{
if (this.watchers[wid])
{
by_watcher[wid] = by_watcher[wid] || { header: { revision: this.mod_revision }, events: {} };
by_watcher[wid].events[notif.key] = conv;
}
}
}
for (const wid in by_watcher)
{
by_watcher[wid].events = Object.values(by_watcher[wid].events);
this.watchers[wid].send({ result: by_watcher[wid] });
}
}
async api_txn({ compare, success, failure })
{
const failed = (compare || []).filter(chk => !this._check(chk)).length > 0;
const responses = [];
const notifications = [];
const next_revision = this.mod_revision + 1;
for (const req of (failed ? failure : success) || [])
{
responses.push(this._txn_action(req, next_revision, notifications));
}
if (this.replicate && notifications.length)
{
// First replicate the change and then notify watchers about it
await this.notify_replicator(notifications);
}
this._notify(notifications);
return { header: { revision: this.mod_revision }, succeeded: !failed, responses };
}
_txn_action(req, cur_revision, notifications)
{
if (req.request_range || req.requestRange)
{
return { response_range: this._range(req.request_range || req.requestRange) };
}
else if (req.request_put || req.requestPut)
{
return { response_put: this._put(req.request_put || req.requestPut, cur_revision, notifications) };
}
else if (req.request_delete_range || req.requestDeleteRange)
{
return { response_delete_range: this._delete_range(req.request_delete_range || req.requestDeleteRange, cur_revision, notifications) };
}
return {};
}
_range(request_range)
{
// FIXME: limit, revision(-), sort_order, sort_target, serializable(-),
// count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision
const { parts, all } = this._get_range(request_range);
const { cur } = this._get_subtree(parts, false, false);
const kvs = [];
if (cur)
{
this._get_all(kvs, cur, all, parts.join('/') || null, request_range);
}
return { kvs };
}
_put(request_put, cur_revision, notifications)
{
// FIXME: prev_kv, ignore_value(?), ignore_lease(?)
const parts = this._key_parts(this.de64(request_put.key));
const key = parts.join('/');
const value = this.de64(request_put.value);
const { cur, watchers } = this._get_subtree(parts, true, true);
if (cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
if (!eq(cur.value, value) || cur.lease != request_put.lease)
{
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[key];
}
if (request_put.lease)
{
if (!this.leases[request_put.lease])
{
throw new RequestError(400, 'unknown lease: '+request_put.lease);
}
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;
}
else if (cur.lease)
{
cur.lease = null;
}
this.mod_revision = cur_revision;
cur.version = (cur.version||0) + 1;
cur.mod_revision = cur_revision;
if (cur.value == null)
{
cur.create_revision = cur_revision;
}
cur.value = value;
const notify = { watchers, key: this.b64(key), value: this.b64(value), mod_revision: cur.mod_revision };
if (cur.lease)
{
notify.lease = cur.lease;
}
notifications.push(notify);
}
return {};
}
_delete_range(request_delete_range, cur_revision, notifications)
{
// FIXME: prev_kv
const { parts, all } = this._get_range(request_delete_range);
const { cur, watchers } = this._get_subtree(parts, false, true);
const prevcount = notifications.length;
if (cur)
{
this._delete_all(notifications, watchers, cur, all, parts.join('/') || null, cur_revision);
}
return { deleted: notifications.length-prevcount };
}
_get_all(kvs, cur, all, prefix, req)
{
if (req.limit && kvs.length > req.limit)
{
return;
}
if (cur.value != null)
{
const item = { key: this.b64(prefix === null ? '' : prefix) };
if (!req.keys_only)
{
item.value = this.b64(cur.value);
item.mod_revision = cur.mod_revision;
//item.create_revision = cur.create_revision;
//item.version = cur.version;
if (cur.lease)
{
item.lease = cur.lease;
}
}
kvs.push(item);
}
if (all && cur.children)
{
for (let k in cur.children)
{
this._get_all(kvs, cur.children[k], true, prefix === null ? k : prefix+'/'+k, req);
}
}
}
_delete_all(notifications, watchers, cur, all, prefix, cur_revision)
{
if (cur.value != null)
{
// Do not actually forget the key until the deletion is confirmed by all replicas
// ...and until it's not required by watchers
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[prefix === null ? '' : prefix];
}
cur.value = null;
cur.version = 0;
cur.create_revision = null;
cur.mod_revision = cur_revision;
this.mod_revision = cur_revision;
notifications.push({
watchers: cur.key_watchers ? [ ...watchers, ...cur.key_watchers ] : watchers,
key: this.b64(prefix === null ? '' : prefix),
mod_revision: cur_revision,
});
}
if (all && cur.children)
{
for (let k in cur.children)
{
const subw = cur.children[k].watchers ? [ ...watchers, ...cur.children[k].watchers ] : watchers;
this._delete_all(notifications, subw, cur.children[k], true, prefix === null ? k : prefix+'/'+k, cur_revision);
}
}
}
}
function eq(a, b)
{
if (a instanceof Object || b instanceof Object)
{
return stableStringify(a) === stableStringify(b);
}
return a == b;
}
function emptyObj(obj)
{
if (!obj)
{
return true;
}
for (const k in obj)
{
return false;
}
return true;
}
EtcTree.eq = eq;
module.exports = EtcTree;

179
etctree.spec.js Normal file
View File

@ -0,0 +1,179 @@
const EtcTree = require('./etctree.js');
const tests = {};
let cur_test = '';
const expect = (a, b) =>
{
if (!EtcTree.eq(a, b))
{
process.stderr.write(cur_test+' test:\nexpected: '+JSON.stringify(b)+'\nreal: '+JSON.stringify(a)+'\n'+new Error().stack.replace(/^.*\n.*\n/, '')+'\n');
process.exit(1);
}
};
tests['read/write'] = async () =>
{
const t = new EtcTree();
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/global' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitasto/', range_end: '/vitasto0' } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_range: { kvs: [] } } ] }
);
expect(
await t.api_txn({
compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 1, result: 'LESS' } ],
success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ],
failure: [ { request_range: { key: '/vitastor/config/global' } } ],
}),
{ header: { revision: 1 }, succeeded: false, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 1, value: { hello: 'world' } } ],
} } ] }
);
expect(
await t.api_txn({
compare: [ { key: '/vitastor/config/global', target: 'MOD', mod_revision: 2, result: 'LESS' } ],
success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world2' } } } ]
}),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t.api_txn({ success: [ { request_range: { key: '/vitastor/config/', range_end: '/vitastor/config0' } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_range: {
kvs: [ { key: '/vitastor/config/global', mod_revision: 2, value: { hello: 'world2' } } ],
} } ] }
);
expect(
t.dump(false),
{"state":{"children":{"":{"children":{"vitastor":{"children":{"config":{"children":{"global":{"version":2,"mod_revision":2,"create_revision":1,"value":{"hello":"world2"}}}}}}}}}},"mod_revision":2,"leases":{}}
);
t.destroy();
};
tests['watch'] = async () =>
{
const t = new EtcTree();
const sent = [];
const send = (event) => sent.push(event);
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor//config/global', value: { hello: 'world' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
t.api_create_watch({ watch_id: 1, key: '/vitastor/', range_end: '/vitastor0' }, send),
{ watch_id: 1, created: true }
);
expect(sent, []);
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'PUT', kv: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' }, mod_revision: 2 } } ] } } ]);
t.destroy();
};
tests['lease'] = async () =>
{
const t = new EtcTree();
const sent = [];
const send = (event) => sent.push(event);
const leaseID = (await t.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
t.api_create_watch({ watch_id: 1, key: '/vitastor/', range_end: '/vitastor0' }, send),
{ watch_id: 1, created: true }
);
expect(sent, []);
const dump = t.dump(false);
const expires = dump.leases[leaseID].expires;
expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":1,"mod_revision":1,"create_revision":1,"value":{"ip":"1.2.3.4"}}}}}}}}}}}},"mod_revision":1,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
await new Promise(ok => setTimeout(ok, 600));
expect(sent, [ { result: { header: { revision: 2 }, events: [ { type: 'DELETE', kv: { key: '/vitastor/osd/state/1', mod_revision: 2 } } ] } } ]);
t.pause_leases();
t.load(dump);
expect(t.dump(false), dump);
const t2 = new EtcTree();
t2.pause_leases();
t2.load(dump);
expect(t2.dump(false), dump);
t.destroy();
t2.destroy();
};
tests['update'] = async () =>
{
const t1 = new EtcTree();
const t2 = new EtcTree();
const leaseID = (await t1.api_grant_lease({ TTL: 0.5 })).ID;
expect(leaseID != null, true);
expect(
await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.4' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t2.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.6' } } } ] }),
{ header: { revision: 1 }, succeeded: true, responses: [ { response_put: {} } ] }
);
expect(
await t1.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', lease: leaseID, value: { ip: '1.2.3.5' } } } ] }),
{ header: { revision: 2 }, succeeded: true, responses: [ { response_put: {} } ] }
);
let dump2 = t2.dump();
t2.load(t1.dump(), true);
t1.load(dump2, true);
let dump = t2.dump(false);
let expires = dump.leases[leaseID].expires;
expect(dump, {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
expect(t1.dump(false), {"state":{"children":{"":{"children":{"vitastor":{"children":{"osd":{"children":{"state":{"children":{"1":{"lease":leaseID,"version":2,"mod_revision":2,"create_revision":1,"value":{"ip":"1.2.3.5"}}}}}}}}}}}},"mod_revision":2,"leases":{[leaseID]:{"ttl":0.5,"expires":expires}}});
t1.destroy();
t2.destroy();
};
tests['replicate watcher'] = async () =>
{
const t = new EtcTree();
t.set_replicate_watcher(async () =>
{
throw new Error('replication failed');
});
let thrown = false;
try
{
await t.api_txn({ success: [ { request_put: { key: '/vitastor/osd/state/1', value: { ip: '1.2.3.4' } } } ] });
}
catch (e)
{
thrown = e;
}
expect(thrown && thrown.message == 'replication failed', true);
t.destroy();
};
(async function()
{
for (cur_test in tests)
{
await tests[cur_test]();
console.log(cur_test+' test: OK');
}
})().catch(console.error);

44
faketimer.js Normal file
View File

@ -0,0 +1,44 @@
// Fake timer for node.js which _guarantees_ that all asynchronous timer
// callbacks are called during a tested period
function FakeTimer(startTime)
{
this.time = Number(startTime)||0;
this.timers = [];
this.id = 0;
this.setTimeout = (func, ms) =>
{
this.timers.push({ id: ++this.id, func, at: this.time+ms });
return this.id;
};
this.setInterval = (func, ms) =>
{
this.timers.push({ id: ++this.id, func, at: this.time+ms, repeat: ms });
return this.id;
};
this.clearInterval = this.clearTimeout = (id) =>
{
this.timers = this.timers.filter(t => t.id != id);
};
this.setImmediate = (func) => this.setTimeout(func, 0);
this.runFor = (ms) =>
{
const end = this.time+ms;
this.timers.sort((a, b) => a.at - b.at);
while (this.timers.length && this.timers[0].at <= end)
{
const { func, at, repeat } = this.timers[0];
if (!repeat)
this.timers.shift();
else
this.timers[0].at += repeat;
if (at > this.time)
this.time = at;
func();
this.timers.sort((a, b) => a.at - b.at);
}
this.time = end;
};
}
module.exports = FakeTimer;

140
model_simple.js Normal file
View File

@ -0,0 +1,140 @@
#!/usr/bin/nodejs
// "Stupid" gossip algorithm simulation tool
function test_simple(options)
{
options.total ||= 100;
options.gossip ||= 4;
options.msgcap ||= 5;
options.update ||= 0;
options.initial ||= 5;
let messages_sent = 0;
let tick = 1;
const known = {};
const lists = {};
const listsv2 = {};
for (let i = 1; i <= options.total; i++)
{
known[i] = {};
lists[i] = [];
for (let j = 1; j <= (options.update ? options.total : options.initial); j++)
{
known[i][j] = 1; // meta version 1
lists[i].push(j);
}
listsv2[i] = [];
}
let cmp_lists;
let cmp_n;
if (options.update)
{
// We want to update <options.update> nodes metadata to version 2
for (let i = 1; i <= options.update; i++)
{
known[i][i] = 2;
listsv2[i].push(i);
}
cmp_lists = listsv2;
cmp_n = options.update;
}
else
{
// We want <options.total-options.initial> to join <options.initial>
for (let i = 1; i <= options.initial; i++)
{
if (!known[i][i])
{
known[i][i] = 1;
lists[i].push(i);
}
for (let alive = options.initial+1; alive <= options.total; alive++)
{
if (!known[i][alive])
{
known[i][alive] = true;
lists[i].push(alive);
}
}
}
cmp_lists = lists;
cmp_n = options.total;
}
let in_sync = 0;
for (let i = 1; i <= options.total; i++)
{
if (cmp_lists[i].length == cmp_n)
{
in_sync++;
}
}
let avg_known = 0;
while (in_sync < options.total)
{
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
for (let i = 1; i <= options.total; i++)
{
const known_i = lists[i];
const send_to = [];
for (let j = 0; j < options.gossip; j++)
{
send_to.push(known_i[0|(Math.random()*known_i.length)]);
}
const send_what = [];
for (let j = 0; j < options.msgcap; j++)
{
// FIXME: Exclude duplicates, exclude <send_to>
send_what.push(known_i[0|(Math.random()*known_i.length)]);
}
for (const alive of send_what)
{
for (const to of send_to)
{
if (!known[to][alive] || known[i][alive] > known[to][alive])
{
known[to][alive] = known[i][alive];
cmp_lists[to].push(alive);
if (cmp_lists[to].length == cmp_n)
{
console.log('node '+to+': tick '+tick);
in_sync++;
}
}
}
}
messages_sent += send_what.length*send_to.length;
}
avg_known = 0;
for (let i = 1; i <= options.total; i++)
{
avg_known += cmp_lists[i].length;
}
avg_known /= options.total;
tick++;
}
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
console.log(messages_sent+' messages sent');
}
const options = {};
for (let i = 2; i < process.argv.length; i++)
{
if (process.argv[i] === '-h' || process.argv[i] === '--help')
{
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+` [OPTIONS]
--gossip 4 how many nodes to gossip with every tick
--msgcap 5 how many nodes to gossip about every tick
--total 1000 total nodes
--update 0 total nodes to update if testing update. if 0 then test joining
--initial 5 initial nodes in sync to test joining (when --update is 0)`);
process.exit();
}
else if (process.argv[i].substr(0, 2) == '--')
{
options[process.argv[i].substr(2)] = 0|process.argv[i+1];
i++;
}
}
test_simple(options);

177
model_update.js Normal file
View File

@ -0,0 +1,177 @@
#!/usr/bin/nodejs
// https://github.com/hashicorp/memberlist simulation tool
class LimQ
{
constructor(retransmit, maxlen)
{
this.buckets = [];
for (let i = 0; i < retransmit; i++)
{
this.buckets.push([]);
}
this.len = 0;
this.maxlen = maxlen;
}
push(item)
{
if (this.len >= this.maxlen)
return;
const b = this.buckets[this.buckets.length-1];
b.push(item);
}
shift(n)
{
let items = [];
let move = [];
for (let i = this.buckets.length-1; i >= 0 && items.length < n; i--)
{
const rm = this.buckets[i].splice(0, n-items.length);
items.push.apply(items, rm);
if (i > 0)
for (const e of rm)
move.push([ e, i-1 ]);
else
this.len -= rm.length;
}
for (const e of move)
{
this.buckets[e[1]].push(e[0]);
}
return items;
}
}
function test_memberlist(options)
{
options.gossip ||= 4;
options.msgcap ||= 5;
options.max_ticks ||= 100000;
options.total ||= 100;
options.retransmit ||= 12;
options.update ||= 0;
options.initial ||= 5;
let tick = 0;
let messages_sent = 0;
const queue = {};
const known = {}; // { node: { other_node: meta_version } }
const lists = {};
const listsv2 = {};
for (let i = 1; i <= options.total; i++)
{
known[i] = {};
lists[i] = [];
for (let j = 1; j <= (options.update ? options.total : options.initial); j++)
{
known[i][j] = 1; // meta version 1
lists[i].push(j);
}
listsv2[i] = [];
queue[i] = new LimQ(options.retransmit, options.max_queue);
}
let cmp_lists;
let cmp_n;
if (options.update)
{
// We want to update <options.update> nodes metadata to version 2
for (let i = 1; i <= options.update; i++)
{
known[i][i] = 2;
listsv2[i].push(i);
queue[i].push(i);
}
cmp_lists = listsv2;
cmp_n = options.update;
}
else
{
// We want <options.total-options.initial> to join <options.initial>
for (let i = 1; i <= options.initial; i++)
{
for (let alive = options.initial+1; alive <= options.total; alive++)
{
known[i][alive] = 1;
lists[i].push(alive);
queue[i].push(alive);
}
}
cmp_lists = lists;
cmp_n = options.total;
}
let in_sync = 0;
for (let i = 1; i <= options.total; i++)
{
if (cmp_lists[i].length == cmp_n)
{
in_sync++;
}
}
let avg_known = 0;
while (in_sync < options.total && tick < options.max_ticks)
{
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
for (let i = 1; i <= options.total; i++)
{
const known_i = lists[i];
for (let g = 0; g < options.gossip; g++)
{
const to = known_i[0|(Math.random()*known_i.length)];
let send_what = queue[i].shift(options.msgcap);
messages_sent += send_what.length;
for (const alive of send_what)
{
if (!known[to][alive] || known[i][alive] > known[to][alive])
{
known[to][alive] = known[i][alive];
cmp_lists[to].push(alive);
queue[to].push(alive);
const cur_updated = cmp_lists[to].length;
if (cur_updated == cmp_n)
{
console.log('node '+to+': synced at tick '+tick);
in_sync++;
}
}
}
}
}
avg_known = 0;
for (let i = 1; i <= options.total; i++)
{
avg_known += cmp_lists[i].length;
}
avg_known /= options.total;
tick++;
}
console.log('tick '+tick+': '+in_sync+' in sync, avg '+avg_known);
console.log(messages_sent+' messages sent');
}
const options = {};
for (let i = 2; i < process.argv.length; i++)
{
if (process.argv[i] === '-h' || process.argv[i] === '--help')
{
console.error('USAGE: '+process.argv[0]+' '+process.argv[1]+` [OPTIONS]
--gossip 4 how many nodes to gossip with every tick
--msgcap 5 how many "alive" messages fits in a single packet (meta size/UDP packet size in memberlist)
--max_ticks 100000 execution limit
--max_queue 1024 queue size limit
--total 100 total nodes
--retransmit 12 retransmission count. by default log(total)*4 in memberlist
--update 0 total nodes to update if testing update. if 0 then test joining
--initial 5 initial nodes in sync to test joining (when --update is 0)`);
process.exit();
}
else if (process.argv[i].substr(0, 2) == '--')
{
options[process.argv[i].substr(2)] = 0|process.argv[i+1];
i++;
}
}
test_memberlist(options);

33
package.json Normal file
View File

@ -0,0 +1,33 @@
{
"name": "tinyraft",
"version": "1.0.0",
"description": "Tiny & abstract Raft leader election algorithm",
"main": "tinyraft.js",
"scripts": {
"lint": "eslint common.js anticli.js antipersistence.js anticluster.js antietcd.js etctree.js etctree.spec.js tinyraft.js tinyraft.spec.js",
"test": "node etctree.spec.js && node tinyraft.spec.js"
},
"repository": {
"type": "git",
"url": "https://git.yourcmc.ru/vitalif/tinyraft"
},
"keywords": [
"raft"
],
"author": "Vitaliy Filippov",
"license": "MPL-2.0",
"engines": {
"node": ">=12.0.0"
},
"devDependencies": {
"eslint": "^8.0.0",
"eslint-plugin-node": "^11.1.0"
},
"dependencies": {
"ws": "^8.17.0"
},
"bin": {
"antietcd": "./antietcd-app.js",
"anticli": "./anticli.js"
}
}

78
stable-stringify.js Normal file
View File

@ -0,0 +1,78 @@
// Copyright (c) Vitaliy Filippov, 2019+
// License: MIT
function stableStringify(obj, opts)
{
if (!opts)
opts = {};
if (typeof opts === 'function')
opts = { cmp: opts };
let space = opts.space || '';
if (typeof space === 'number')
space = Array(space+1).join(' ');
const cycles = (typeof opts.cycles === 'boolean') ? opts.cycles : false;
const cmp = opts.cmp && (function (f)
{
return function (node)
{
return function (a, b)
{
let aobj = { key: a, value: node[a] };
let bobj = { key: b, value: node[b] };
return f(aobj, bobj);
};
};
})(opts.cmp);
const seen = new Map();
return (function stringify (parent, key, node, level)
{
const indent = space ? ('\n' + new Array(level + 1).join(space)) : '';
const colonSeparator = space ? ': ' : ':';
if (node === undefined)
{
return;
}
if (typeof node !== 'object' || node === null)
{
return JSON.stringify(node);
}
if (node instanceof Array)
{
const out = [];
for (let i = 0; i < node.length; i++)
{
const item = stringify(node, i, node[i], level+1) || JSON.stringify(null);
out.push(indent + space + item);
}
return '[' + out.join(',') + indent + ']';
}
else
{
if (seen.has(node))
{
if (cycles)
return JSON.stringify('__cycle__');
throw new TypeError('Converting circular structure to JSON');
}
else
seen.set(node, true);
const keys = Object.keys(node).sort(cmp && cmp(node));
const out = [];
for (let i = 0; i < keys.length; i++)
{
const key = keys[i];
const value = stringify(node, key, node[key], level+1);
if (!value)
continue;
const keyValue = JSON.stringify(key)
+ colonSeparator
+ value;
out.push(indent + space + keyValue);
}
seen.delete(node);
return '{' + out.join(',') + indent + '}';
}
})({ '': obj }, '', obj, 0);
}
module.exports = stableStringify;

36
tinyraft.d.ts vendored Normal file
View File

@ -0,0 +1,36 @@
import type { EventEmitter } from 'events';
export type TinyRaftEvents = {
change: {
state: 'LEADER'|'FOLLOWER'|'CANDIDATE',
term: number,
leader: string|number,
followers: (string|number)[],
}[],
};
export type RaftMessage = {
type: 'VOTE_REQUEST'|'VOTE'|'PING'|'PONG',
term: number,
leader?: string|number,
priority?: number,
};
export class TinyRaft extends EventEmitter<TinyRaftEvents>
{
constructor(cfg: {
nodes: (number|string)[],
nodeId: number|string,
electionTimeout?: number,
heartbeatTimeout?: number,
leadershipTimeout?: number,
initialTerm?: number,
leaderPriority?: number,
send: (to: number|string, msg: RaftMessage) => void,
});
start();
stop();
onReceive(from: number|string, msg: RaftMessage);
markAlive();
setNodes(nodes: (number|string)[]);
}

View File

@ -11,6 +11,14 @@
//
// Supports leader expiration like in NuRaft:
// https://github.com/eBay/NuRaft/blob/master/docs/leadership_expiration.md
//
// Also supports leader priorities, similar to NuRaft but even simpler:
// If a node receives a VoteRequest message with larger term but with smaller
// priority than its own, it immediately starts a new voting round.
// It guarantees that a node with non-maximum priority can't become leader
// without being re-elected.
// If all priorities are equal (or just zero), the election algorithm
// becomes identical to the basic algorithm without priorities.
const EventEmitter = require('events');
@ -32,7 +40,10 @@ class TinyRaft extends EventEmitter
// heartbeatTimeout?: number,
// leadershipTimeout?: number,
// initialTerm?: number,
// leaderPriority?: number,
// }
// NOTE: leadershipTimeout, if enabled, should be > heartbeatTimeout, and ideally < electionTimeout
// In that case a leader with dead network will be expired before being re-elected
constructor(config)
{
super();
@ -43,6 +54,7 @@ class TinyRaft extends EventEmitter
this.randomTimeout = config.randomTimeout > 0 ? Number(config.randomTimeout) : this.electionTimeout;
this.heartbeatTimeout = Number(config.heartbeatTimeout) || 1000;
this.leadershipTimeout = Number(config.leadershipTimeout) || 0;
this.leaderPriority = Number(config.leaderPriority) || undefined;
if (!this.nodeId || this.nodeId instanceof Object ||
!(this.nodes instanceof Array) || this.nodes.filter(n => !n || n instanceof Object).length > 0 ||
!(this.send instanceof Function))
@ -52,18 +64,19 @@ class TinyRaft extends EventEmitter
this.term = 0;
this.state = null;
this.leader = null;
this.confirmed = {};
}
_nextTerm(after)
{
if (this.electionTimer)
{
clearTimeout(this.electionTimer);
TinyRaft.clearTimeout(this.electionTimer);
this.electionTimer = null;
}
if (after >= 0)
{
this.electionTimer = setTimeout(() => this.start(), after + this.randomTimeout * Math.random());
this.electionTimer = TinyRaft.setTimeout(() => this.start(), after + this.randomTimeout * Math.random());
}
}
@ -76,16 +89,17 @@ class TinyRaft extends EventEmitter
this.votes = { [this.nodeId]: [ this.nodeId ] };
this.state = CANDIDATE;
this.followers = null;
this.confirmed = {};
this.leader = this.nodeId;
if (!this.heartbeatTimer)
{
this.heartbeatTimer = setInterval(() => this._heartbeat(), this.heartbeatTimeout);
this.heartbeatTimer = TinyRaft.setInterval(() => this._heartbeat(), this.heartbeatTimeout);
}
for (const node of this.nodes)
{
if (node != this.nodeId)
{
this.send(node, { type: VOTE_REQUEST, term: this.term, leader: this.leader });
this.send(node, { type: VOTE_REQUEST, term: this.term, leader: this.leader, priority: this.leaderPriority });
}
}
// Next term will start right after this one times out
@ -97,12 +111,12 @@ class TinyRaft extends EventEmitter
{
if (this.electionTimer)
{
clearTimeout(this.electionTimer);
TinyRaft.clearTimeout(this.electionTimer);
this.electionTimer = null;
}
if (this.heartbeatTimer)
{
clearTimeout(this.heartbeatTimer);
TinyRaft.clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
@ -111,11 +125,12 @@ class TinyRaft extends EventEmitter
{
if (this.state == LEADER)
{
for (const node of this.votes[this.nodeId])
this.confirmed = {};
for (const node of (this.leadershipTimeout ? this.followers : this.nodes))
{
if (node != this.nodeId)
{
this.send(node, { type: PING, term: this.term });
this.send(node, { type: PING, term: this.term, priority: this.leaderPriority });
}
}
}
@ -125,89 +140,153 @@ class TinyRaft extends EventEmitter
{
if (msg.type == VOTE_REQUEST)
{
if (msg.term > this.term && msg.leader)
{
this.leader = msg.leader;
this.term = msg.term;
this.state = CANDIDATE;
this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout);
this.emit('change', { state: this.state, term: this.term, leader: this.leader });
}
this.send(from, { type: VOTE, term: this.term, leader: this.leader });
this._onReceiveVoteRequest(from, msg);
}
else if (msg.type == VOTE && msg.term == this.term)
else if (msg.type == VOTE)
{
this.voted++;
this.votes[msg.leader] = this.votes[msg.leader] || [];
this.votes[msg.leader].push(from);
const n = this.votes[msg.leader].length;
if (n == 1 + (0 | this.nodes.length/2))
{
if (msg.leader == this.nodeId)
{
this.leader = msg.leader;
this.state = LEADER;
this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1);
this.followers = this.votes[this.nodeId];
for (const follower of this.followers)
{
if (follower != this.nodeId)
{
// Send a heartbeat to confirm leadership
this.send(follower, { type: PING, term: this.term });
}
}
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else
{
this._nextTerm(0);
}
}
else if (n > this.nodes.length/2 && this.state == LEADER && msg.leader == this.nodeId)
{
this.followers = this.votes[this.nodeId];
// Send a heartbeat to confirm leadership
this.send(from, { type: PING, term: this.term });
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else if (this._isVotingFailed())
{
this._nextTerm(0);
}
this._onReceiveVote(from, msg);
}
else if (msg.type == PING)
{
if (this.state == CANDIDATE && this.term == msg.term && from == this.leader)
{
this.state = FOLLOWER;
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId });
}
if (this.state == FOLLOWER && from == this.leader)
{
this.markAlive();
}
if (this.leadershipTimeout > 0)
{
this.send(from, { type: PONG, term: this.term, leader: this.leader });
}
this._onReceivePing(from, msg);
}
else if (msg.type == PONG && this.state == LEADER)
else if (msg.type == PONG)
{
if (msg.leader != this.nodeId)
this._onReceivePong(from, msg);
}
}
_onReceiveVoteRequest(from, msg)
{
if (msg.term > this.term && msg.leader)
{
if (this.leaderPriority && (msg.priority||0) < this.leaderPriority)
{
this.term = msg.term;
this.start();
}
else
{
this.leader = msg.leader;
this.term = msg.term;
this.state = CANDIDATE;
this.followers = null;
this.confirmed = {};
this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout);
this.emit('change', { state: this.state, term: this.term, leader: this.leader });
}
}
const prio = (this.leader == this.nodeId ? this.leaderPriority : undefined);
this.send(from, { type: VOTE, term: this.term, leader: this.leader, priority: prio });
}
_onReceiveVote(from, msg)
{
if (!msg.leader || msg.term < this.term)
{
return;
}
if (msg.term > this.term)
{
this.term = msg.term;
this.state = FOLLOWER;
this.followers = null;
this.confirmed = {};
this.leader = msg.leader;
this.votes = {};
this.emit('change', { state: this.state, term: this.term, leader: this.leader });
// Repeat VOTE to the leader to join it
this.send(this.leader, { type: VOTE, term: this.term, leader: this.leader, priority: msg.priority });
return;
}
// add <from> as voter for <msg.leader>
this.votes[msg.leader] = this.votes[msg.leader] || [];
let found = false;
for (const voter of this.votes[msg.leader])
{
if (voter == from)
{
found = true;
break;
}
}
if (!found)
{
this.voted++;
this.votes[msg.leader].push(from);
}
const n = this.votes[msg.leader].length;
if (n == 1 + (0 | this.nodes.length/2))
{
if (msg.leader == this.nodeId)
{
this.leader = msg.leader;
this.state = LEADER;
this.confirmed = {};
this._nextTerm(this.leadershipTimeout > 0 ? this.leadershipTimeout : -1);
this.followers = this.votes[this.nodeId];
// Send a heartbeat to confirm leadership
this._heartbeat();
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else
{
this._nextTerm(0);
}
}
else if (n > this.nodes.length/2 && this.state == LEADER && msg.leader == this.nodeId)
{
this.followers = this.votes[this.nodeId];
// Send a heartbeat to confirm leadership
this.send(from, { type: PING, term: this.term, priority: this.leaderPriority });
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else if (this._isVotingFailed())
{
this._nextTerm(0);
}
}
_onReceivePing(from, msg)
{
if (this.state == CANDIDATE && this.term == msg.term && from == this.leader)
{
this.state = FOLLOWER;
this.emit('change', { state: this.state, term: this.term, leader: this.leader });
}
if (this.state == FOLLOWER && from == this.leader)
{
this.markAlive();
}
if (this.leadershipTimeout > 0 || this.state == FOLLOWER && from != this.leader)
{
this.send(from, { type: PONG, term: this.term, leader: this.leader });
}
}
_onReceivePong(from, msg)
{
if (this.state != LEADER)
{
return;
}
if (msg.leader != this.nodeId || msg.term != this.term)
{
this.start();
}
else if (this.leadershipTimeout > 0)
{
this.confirmed[from] = true;
if (Object.keys(this.confirmed).length >= (1 + (0 | this.nodes.length/2)))
{
// We have quorum
this._nextTerm(this.leadershipTimeout);
}
}
}
markAlive()
{
this.leaderTimedOut = false;
this._nextTerm(this.heartbeatTimeout*2 + this.electionTimeout);
}
@ -233,16 +312,22 @@ class TinyRaft extends EventEmitter
if (this.state == LEADER)
{
this.votes[this.nodeId] = this.votes[this.nodeId].filter(n => nodes.indexOf(n) >= 0);
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
if (this.votes[this.nodeId].length < (1 + (0 | this.nodes.length/2)))
this.start();
else
this.emit('change', { state: this.state, term: this.term, leader: this.nodeId, followers: this.votes[this.nodeId] });
}
else if (this.state == FOLLOWER && nodes.indexOf(this.leader) < 0 || this.state == CANDIDATE)
{
r.nextTerm(-1);
r.start();
this.start();
}
}
}
TinyRaft.setTimeout = setTimeout;
TinyRaft.clearTimeout = clearTimeout;
TinyRaft.setInterval = setInterval;
TinyRaft.clearInterval = clearInterval;
TinyRaft.VOTE_REQUEST = VOTE_REQUEST;
TinyRaft.VOTE = VOTE;
TinyRaft.PING = PING;

View File

@ -1,9 +1,16 @@
const TinyRaft = require('./tinyraft.js');
const FakeTimer = require('./faketimer.js');
function newNode(id, nodes, partitions)
const fake = new FakeTimer();
TinyRaft.setTimeout = fake.setTimeout;
TinyRaft.clearTimeout = fake.clearTimeout;
TinyRaft.setInterval = fake.setInterval;
TinyRaft.clearInterval = fake.clearInterval;
function newNode(id, nodes, partitions, mod)
{
partitions = partitions || {};
let n = new TinyRaft({
let cfg = {
nodes: [ 1, 2, 3, 4, 5 ],
nodeId: id,
heartbeatTimeout: 100,
@ -12,28 +19,33 @@ function newNode(id, nodes, partitions)
{
if (!partitions[n.nodeId+'-'+to] && !partitions[to+'-'+n.nodeId] && nodes[to])
{
console.log('received from '+n.nodeId+' to '+to+': '+JSON.stringify(msg));
setImmediate(function() { nodes[to].onReceive(n.nodeId, msg); });
console.log('['+fake.time+'] received from '+n.nodeId+' to '+to+': '+JSON.stringify(msg));
fake.setImmediate(function() { nodes[to].onReceive(n.nodeId, msg); });
}
else
console.log('['+fake.time+'] failed to send from '+n.nodeId+' to '+to+': '+JSON.stringify(msg));
},
});
};
if (mod)
mod(cfg);
let n = new TinyRaft(cfg);
n.on('change', (st) =>
{
console.log(
'node '+n.nodeId+': '+(st.state == TinyRaft.FOLLOWER ? 'following '+st.leader : st.state)+
'['+fake.time+'] node '+n.nodeId+': '+(st.state == TinyRaft.FOLLOWER ? 'following '+st.leader : st.state)+
', term '+st.term+(st.state == TinyRaft.LEADER ? ', followers: '+st.followers.join(', ') : '')
);
});
nodes[id] = n;
}
function newNodes(count, partitions)
function newNodes(count, partitions, mod)
{
partitions = partitions || {};
const nodes = {};
for (let i = 1; i <= count; i++)
{
newNode(i, nodes, partitions);
newNode(i, nodes, partitions, mod);
}
for (let i = 1; i <= count; i++)
{
@ -45,6 +57,7 @@ function newNodes(count, partitions)
function checkQuorum(nodes, count)
{
let leaders = 0;
let out = 0;
for (const i in nodes)
{
if (nodes[i].state == TinyRaft.LEADER)
@ -55,14 +68,16 @@ function checkQuorum(nodes, count)
}
else if (nodes[i].state != TinyRaft.FOLLOWER)
{
throw new Error('node '+i+' is not in quorum: state '+nodes[i].state);
out++;
if (out > nodes.length-count)
throw new Error('node '+i+' is not in quorum: state '+nodes[i].state);
}
}
if (leaders != 1)
{
throw new Error('we have '+leaders+' leaders instead of exactly 1');
}
console.log('OK: '+count+' nodes in quorum');
console.log('['+fake.time+'] OK: '+count+' nodes in quorum');
}
function checkNoQuorum(nodes)
@ -72,11 +87,12 @@ function checkNoQuorum(nodes)
{
throw new Error('we have non-candidates ('+nc.map(n => n.nodeId).join(', ')+'), but we should not');
}
console.log('OK: '+Object.keys(nodes).length+' candidates, no quorum');
console.log('['+fake.time+'] OK: '+Object.keys(nodes).length+' candidates, no quorum');
}
async function testStartThenRemoveNode()
{
console.log('--------------------------------------------------------------------------------');
console.log('testStartThenRemoveNode');
const nodes = newNodes(5);
let leaderChanges = 0, prevLeader = null;
@ -88,7 +104,7 @@ async function testStartThenRemoveNode()
leaderChanges++;
}
});
await new Promise(ok => setTimeout(ok, 2000));
fake.runFor(2000);
checkQuorum(nodes, 5);
if (leaderChanges >= 3)
{
@ -100,7 +116,7 @@ async function testStartThenRemoveNode()
nodes[leader].stop();
delete nodes[leader];
// Check quorum after 2000ms
await new Promise(ok => setTimeout(ok, 2000));
fake.runFor(2000);
checkQuorum(nodes, 4);
// Stop the leader again
leader = nodes[Object.keys(nodes)[0]].leader;
@ -108,7 +124,7 @@ async function testStartThenRemoveNode()
nodes[leader].stop();
delete nodes[leader];
// Check quorum after 2000ms
await new Promise(ok => setTimeout(ok, 2000));
fake.runFor(2000);
checkQuorum(nodes, 3);
// Stop the leader again
leader = nodes[Object.keys(nodes)[0]].leader;
@ -116,7 +132,7 @@ async function testStartThenRemoveNode()
nodes[leader].stop();
delete nodes[leader];
// Check that no quorum exists
await new Promise(ok => setTimeout(ok, 2000));
fake.runFor(2000);
checkNoQuorum(nodes);
// Clean up
for (const id in nodes)
@ -128,9 +144,10 @@ async function testStartThenRemoveNode()
async function testAddNode()
{
console.log('--------------------------------------------------------------------------------');
console.log('testAddNode');
const nodes = newNodes(5);
await new Promise(ok => setTimeout(ok, 2000));
const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000);
fake.runFor(2000);
checkQuorum(nodes, 5);
// Add node
newNode(6, nodes);
@ -138,7 +155,7 @@ async function testAddNode()
nodes[i].setNodes([ 1, 2, 3, 4, 5, 6 ]);
nodes[6].start();
// Check quorum after 2000ms
await new Promise(ok => setTimeout(ok, 2000));
fake.runFor(2000);
checkQuorum(nodes, 6);
// Clean up
for (const id in nodes)
@ -148,10 +165,200 @@ async function testAddNode()
console.log('testAddNode: OK');
}
async function testLeadershipExpiration()
{
console.log('--------------------------------------------------------------------------------');
console.log('testLeadershipExpiration');
const partitions = {};
const nodes = newNodes(5, partitions, cfg => cfg.leadershipTimeout = 1500);
// Check that 5 nodes are in quorum after 2000ms
fake.runFor(2000);
checkQuorum(nodes, 5);
// Break network on the leader
let leader = nodes[1].leader;
console.log("["+fake.time+"] --> stopping the leader's ("+leader+") network");
for (let i = 1; i <= 5; i++)
{
partitions[i+'-'+leader] = true;
partitions[leader+'-'+i] = true;
}
// Check that the leader loses leadership after 2 * leadershipTimeout
fake.runFor(3000);
if (nodes[leader].state != TinyRaft.CANDIDATE)
{
throw new Error("leadership expiration doesn't work");
}
// Clean up
for (const id in nodes)
{
nodes[id].stop();
}
console.log('testLeadershipExpiration: OK');
}
async function testRestart()
{
console.log('--------------------------------------------------------------------------------');
console.log('testRestart');
const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000);
let leaderChanges = 0, prevLeader = null;
nodes[2].on('change', (st) =>
{
const leader = st.state == TinyRaft.CANDIDATE ? null : st.leader;
if (leader != prevLeader)
{
prevLeader = leader;
leaderChanges++;
}
});
// Check that 5 nodes are in quorum after 2000ms
fake.runFor(2000);
checkQuorum(nodes, 5);
if (leaderChanges >= 3)
{
throw new Error("leaderChanges = "+leaderChanges+" (expected < 3)");
}
// Stop a follower
let restarted = 1 + (prevLeader % 5);
if (restarted == 2)
{
restarted = 1 + (prevLeader + 1) % 5;
}
console.log("["+fake.time+"] --> stopping a follower (node "+restarted+")");
nodes[restarted].stop();
delete nodes[restarted];
// Wait 2000ms
fake.runFor(2000);
// Restart a follower
console.log("["+fake.time+"] --> restarting a follower (node "+restarted+")");
leaderChanges = 0;
newNode(restarted, nodes, {}, null);
nodes[restarted].start();
// Check quorum and the fact that the leader didn't change after 2000ms
fake.runFor(2000);
checkQuorum(nodes, 5);
if (leaderChanges > 0)
{
throw new Error("leader changed after restart of a follower");
}
// Clean up
for (const id in nodes)
{
nodes[id].stop();
}
console.log('testRestart: OK');
}
async function testChangeNodes()
{
console.log('--------------------------------------------------------------------------------');
console.log('testChangeNodes');
console.log('['+fake.time+'] --> starting nodes 1-5');
const nodes = newNodes(5, {}, cfg => cfg.initialTerm = 1000);
// Check that 5 nodes are in quorum after 2000ms
fake.runFor(2000);
checkQuorum(nodes, 5);
// Stop node 4
console.log('['+fake.time+'] --> stopping node 4');
nodes[4].stop();
delete nodes[4];
// Wait 1000ms
fake.runFor(1000);
// Change nodes from 1 2 3 4 5 to 1 2 3 5 6
console.log('['+fake.time+'] --> starting node 6');
newNode(6, nodes);
nodes[6].start();
nodes[1].setNodes([ 1, 2, 3, 5, 6 ]);
nodes[2].setNodes([ 1, 2, 3, 5, 6 ]);
nodes[3].setNodes([ 1, 2, 3, 5, 6 ]);
nodes[5].setNodes([ 1, 2, 3, 5, 6 ]);
nodes[6].setNodes([ 1, 2, 3, 5, 6 ]);
// Check that 5 nodes are in quorum after 2000ms
fake.runFor(2000);
checkQuorum(nodes, 5);
// Clean up
for (const id in nodes)
{
if (nodes[id])
nodes[id].stop();
}
console.log('testChangeNodes: OK');
}
async function testLeaderPriority()
{
console.log('--------------------------------------------------------------------------------');
console.log('testLeaderPriority');
console.log('['+fake.time+'] --> starting nodes 1-5');
const nodes = newNodes(5, {}, cfg => cfg.leaderPriority = cfg.nodeId+1);
// Check that 5 nodes are in quorum after 2000ms
fake.runFor(2000);
checkQuorum(nodes, 5);
if (nodes[1].leader != 5)
{
throw new Error('leader is not 5');
}
// Stop node 5
console.log('['+fake.time+'] --> stopping node 5');
nodes[5].stop();
delete nodes[5];
// Wait 2000ms and check that the leader is now 4
fake.runFor(2000);
checkQuorum(nodes, 4);
if (nodes[1].leader != 4)
{
throw new Error('leader is not 4');
}
// Stop node 4
console.log('['+fake.time+'] --> stopping node 4');
nodes[4].stop();
delete nodes[4];
// Wait 2000ms and check that the leader is now 3
fake.runFor(2000);
checkQuorum(nodes, 3);
if (nodes[1].leader != 3)
{
throw new Error('leader is not 3');
}
// Clean up
for (const id in nodes)
{
if (nodes[id])
nodes[id].stop();
}
console.log('testLeaderPriority: OK');
}
async function testPartition1_3()
{
console.log('--------------------------------------------------------------------------------');
console.log('testPartition1_3');
const partitions = { '1-3': true, '3-1': true };
const nodes = newNodes(3, partitions, cfg => cfg.nodes = [ 1, 2, 3 ]);
// Check that 2 or 3 nodes are in quorum after 5000ms
// This situation should be fixed by "prevote protocol", but it breaks other things
fake.runFor(5000);
if (nodes[2].leader == 2)
checkQuorum(nodes, 3);
else
checkQuorum(nodes, 2);
// Clean up
for (const id in nodes)
{
nodes[id].stop();
}
console.log('testPartition1_3: OK');
}
async function run()
{
await testStartThenRemoveNode();
await testAddNode();
await testLeadershipExpiration();
await testRestart();
await testChangeNodes();
await testLeaderPriority();
await testPartition1_3();
process.exit(0);
}

View File

@ -0,0 +1,44 @@
function vitastor_persist_filter(cfg)
{
const prefix = cfg.vitastor_prefix || '/vitastor';
return (key, value) =>
{
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')
{
if (value)
{
try
{
value = JSON.parse(value);
value = JSON.stringify({
bitmap_granularity: value.bitmap_granularity || undefined,
data_block_size: value.data_block_size || undefined,
host: value.host || undefined,
immediate_commit: value.immediate_commit || undefined,
});
}
catch (e)
{
console.error('invalid JSON in '+key+' = '+value+': '+e);
value = {};
}
}
else
{
value = undefined;
}
return value;
}
else if (key.substr(0, prefix.length+'/osd/'.length) == prefix+'/osd/' ||
key.substr(0, prefix.length+'/inode/stats/'.length) == prefix+'/inode/stats/' ||
key.substr(0, prefix.length+'/pg/stats/'.length) == prefix+'/pg/stats/' ||
key.substr(0, prefix.length+'/pool/stats/'.length) == prefix+'/pool/stats/' ||
key == prefix+'/stats')
{
return undefined;
}
return value;
};
}
module.exports = vitastor_persist_filter;