Compare commits

...

10 Commits

Author SHA1 Message Date
Vitaliy Filippov 9f6eb4570b Fix package.json 2024-06-01 20:39:05 +03:00
Vitaliy Filippov 45543b2891 Add raftchange event to AntiEtcd type definitions 2024-06-01 17:14:14 +03:00
Vitaliy Filippov f94c106581 Add client docs to readme 2024-06-01 17:14:14 +03:00
Vitaliy Filippov 19a24b0157 Add TLS cert/key support to cli 2024-06-01 17:14:14 +03:00
Vitaliy Filippov 1c3e5020dd Add TS type definitions for AntiEtcd 2024-06-01 17:14:14 +03:00
Vitaliy Filippov 068ed6bed7 Run deletion compaction also in non-clustered mode 2024-06-01 17:14:14 +03:00
Vitaliy Filippov 3dccbb2218 Add TOC 2024-06-01 17:14:14 +03:00
Vitaliy Filippov b39c5c6f49 Implement embeddable watch API 2024-06-01 17:14:14 +03:00
Vitaliy Filippov de80a03e10 Add readme for AntiEtcd 2024-06-01 17:14:14 +03:00
Vitaliy Filippov ef1ef46d7e Multiple polishing fixes
- Add support for HTTPS client certificates
- Make requests wait for quorum by default
- Extract vitastor_persist_filter from main code
- Do not use Promise.allSettled
- Throw 501 in etctree on unsupported features
2024-06-01 17:14:14 +03:00
10 changed files with 857 additions and 75 deletions

498
README.md Normal file
View File

@ -0,0 +1,498 @@
# AntiEtcd
Simplistic miniature etcd replacement based on [TinyRaft](https://git.yourcmc.ru/vitalif/tinyraft/)
- Embeddable
- REST API only, gRPC is shit and will never be supported
- [TinyRaft](https://git.yourcmc.ru/vitalif/tinyraft/)-based leader election
- Websocket-based cluster communication
- Supports a limited subset of etcd REST APIs
- With optional persistence
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or [VNPL-1.1](https://git.yourcmc.ru/vitalif/vitastor/src/branch/master/VNPL-1.1.txt)
# Contents
- [CLI Usage](#cli-usage)
- [CLI Client](#cli-client)
- [Options](#options)
- [HTTP](#http)
- [Persistence](#persistence)
- [Clustering](#clustering)
- [Embedded Usage](#embedded-usage)
- [About Persistence](#about-persistence)
- [Supported etcd APIs](#supported-etcd-apis)
- [/v3/kv/txn](#v3-kv-txn)
- [/v3/kv/put](#v3-kv-put)
- [/v3/kv/range](#v3-kv-range)
- [/v3/kv/deleterange](#v3-kv-deleterange)
- [/v3/lease/grant](#v3-lease-grant)
- [/v3/lease/keepalive](#v3-lease-keepalive)
- [/v3/lease/revoke or /v3/kv/lease/revoke](#v3-lease-revoke-or-v3-kv-lease-revoke)
- [Websocket-based watch APIs](#websocket-based-watch-apis)
- [HTTP Error Codes](#http-error-codes)
## CLI Usage
```
npm install antietcd
node_modules/.bin/antietcd \
[--cert ssl.crt] [--key ssl.key] [--port 12379] \
[--data data.gz] [--persist_interval 500] \
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381]
[other options]
```
Antietcd doesn't background itself, so use systemd or start-stop-daemon to run it as a background service.
### CLI Client
```
node_modules/.bin/anticli [OPTIONS] put <key> [<value>]
node_modules/.bin/anticli [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]
node_modules/.bin/anticli [OPTIONS] del <key> [-p|--prefix]
```
For `put`, if `<value>` is not specified, it will be read from STDIN.
Options:
<dl>
<dt>--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379</dt>
<dd>Specify HTTP endpoints to connect to</dd>
<dt>--cert &lt;cert&gt;</dt>
<dd>Use TLS with this certificate file (PEM format)</dd>
<dt>--key &lt;key&gt;</dt>
<dd>Use TLS with this key file (PEM format)</dd>
<dt>--timeout 1000</dt>
<dd>Specify request timeout in milliseconds</dd>
</dl>
## Options
### HTTP
<dl>
<dt>--port 2379</dt>
<dd>Listen port</dd>
<dt>--cert &lt;cert&gt;</dt>
<dd>Use TLS with this certificate file (PEM format)</dd>
<dt>--key &lt;key&gt;</dt>
<dd>Use TLS with this key file (PEM format)</dd>
<dt>--ca &lt;ca&gt;</dt>
<dd>Use trusted root certificates from this file.
Specify &lt;ca&gt; = &lt;cert&gt; if your certificate is self-signed.</dd>
<dt>--client_cert_auth 1</dt>
<dd>Require TLS client certificates signed by <ca> or by default CA to connect.</dd>
<dt>--ws_keepalive_interval 30000</dt>
<dd>Client websocket ping (keepalive) interval in milliseconds</dd>
</dl>
### Persistence
<dl>
<dt>--data &lt;filename&gt;</dt>
<dd>Store persistent data in &lt;filename&gt;</dd>
<dt>--persist_interval &lt;milliseconds&gt;</dt>
<dd>Persist data on disk after this interval, not immediately after change</dd>
<dt>--persist_filter ./filter.js</dt>
<dd>Use persistence filter from ./filter.js (or a module). <br />
Persistence filter is a function(cfg) returning function(key, value) ran
for every change and returning a new value or undefined to skip persistence.</dd>
<dt>--compact_revisions 1000</dt>
<dd>Number of previous revisions to keep deletion information in memory</dd>
</dl>
### Clustering
<dl>
<dt>--node_id &lt;id></dt>
<dd>ID of this cluster node</dd>
<dt>--cluster &lt;id1&gt;=&lt;url1&gt;,&lt;id2&gt;=&lt;url2&gt;,...</dt>
<dd>All other cluster nodes</dd>
<dt>--cluster_key &lt;key&gt;</dt>
<dd>Shared cluster key for identification</dd>
<dt>--election_timeout 5000</dt>
<dd>Raft election timeout</dd>
<dt>--heartbeat_timeout 1000</dt>
<dd>Raft leader heartbeat timeout</dd>
<dt>--wait_quorum_timeout 30000</dt>
<dd>Timeout for requests to wait for quorum to come up</dd>
<dt>--leader_priority &lt;number&gt;</dt>
<dd>Raft leader priority for this node (optional)</dd>
<dt>--stale_read 1</dt>
<dd>Allow to serve reads from followers. Specify 0 to disallow</dd>
<dt>--reconnect_interval 1000</dt>
<dd>Unavailable peer connection retry interval</dd>
<dt>--dump_timeout 5000</dt>
<dd>Timeout for dump command in milliseconds</dd>
<dt>--load_timeout 5000</dt>
<dd>Timeout for load command in milliseconds</dd>
<dt>--forward_timeout 1000</dt>
<dd>Timeout for forwarding requests from follower to leader in milliseconds</dd>
<dt>--replication_timeout 1000</dt>
<dd>Timeout for replicating requests from leader to follower in milliseconds</dd>
<dt>--compact_timeout 1000</dt>
<dd>Timeout for compaction requests from leader to follower in milliseconds</dd>
</dl>
## Embedded Usage
```js
const AntiEtcd = require('antietcd');
// Configuration may contain all the same options like in CLI, without "--"
// Except that persist_filter should be a callback (key, value) => newValue
const srv = new AntiEtcd({ ...configuration });
// Start server
srv.start();
// Make a local API call in generic style:
let res = await srv.api('kv_txn'|'kv_range'|'kv_put'|'kv_deleterange'|'lease_grant'|'lease_revoke'|'lease_keepalive', { ...params });
// Or function-style:
res = await srv.txn(params);
res = await srv.range(params);
res = await srv.put(params);
res = await srv.deleterange(params);
res = await srv.lease_grant(params);
res = await srv.lease_revoke(params);
res = await srv.lease_keepalive(params);
// Error handling:
try
{
res = await srv.txn(params);
}
catch (e)
{
if (e instanceof AntiEtcd.RequestError)
{
// e.code is HTTP code
// e.message is error message
}
}
// Watch API:
const watch_id = await srv.create_watch(params, (message) => console.log(message));
await srv.cancel_watch(watch_id);
// Stop server
srv.stop();
```
## About Persistence
Persistence is very simple: full database is dumped into JSON, gzipped and saved as file.
By default, it is written and fsynced on disk on every change, but it can be configured
to dump DB on disk at fixed intervals, for example, at most every 500 ms - of course,
at expense of slightly reduced crash resiliency (example: `--persist_interval 500`).
You can also specify a filter to exclude some data from persistence by using the option
`--persist_filter ./filter.js`. Persistence filter code example:
```js
function example_filter(cfg)
{
// <cfg> contains all command-line options
const prefix = cfg.exclude_keys;
if (!prefix)
{
return null;
}
return (key, value) =>
{
if (key.substr(0, prefix.length) == prefix)
{
// Skip all keys with prefix from persistence
return undefined;
}
if (key === '/statistics')
{
// Return <unneeded_key> from inside value
const decoded = JSON.parse(value);
return JSON.stringify({ ...decoded, unneeded_key: undefined });
}
return value;
};
}
module.exports = example_filter;
```
## Supported etcd APIs
NOTE: `key`, `value` and `range_end` are always encoded in base64, like in original etcd.
Range requests are only supported across "directories" separated by `/`.
It means that in range requests `key` must always end with `/` and `range_end` must always
end with `0`, and that such request will return a whole subtree of keys.
### /v3/kv/txn
Request:
```ts
type TxnRequest = {
compare?: (
{ key: string, target: "MOD", mod_revision: number, result?: "LESS" }
| { key: string, target: "CREATE", create_revision: number, result?: "LESS" }
| { key: string, target: "VERSION", version: number, result?: "LESS" }
| { key: string, target: "LEASE", lease: string, result?: "LESS" }
| { key: string, target: "VALUE", value: string }
)[],
success?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
failure?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
serializable?: boolean,
}
```
`serializable` allows to serve read-only requests from follower even if `stale_read` is not enabled.
Response:
```ts
type TxnResponse = {
header: { revision: number },
succeeded: boolean,
responses: (
{ response_put: PutResponse }
| { response_range: RangeResponse }
| { response_delete_range: DeleteRangeResponse }
)[],
}
```
### /v3/kv/put
Request:
```ts
type PutRequest = {
key: string,
value: string,
lease?: string,
}
```
Other parameters are not supported: prev_kv, ignore_value, ignore_lease.
Response:
```ts
type PutResponse = {
header: { revision: number },
}
```
### /v3/kv/range
Request:
```ts
type RangeRequest = {
key: string,
range_end?: string,
keys_only?: boolean,
serializable?: boolean,
}
```
`serializable` allows to serve read-only requests from follower even if `stale_read` is not enabled.
Other parameters are not supported: revision, limit, sort_order, sort_target,
count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision.
Response:
```ts
type RangeResponse = {
header: { revision: number },
kvs: { key: string }[] | {
key: string,
value: string,
lease?: string,
mod_revision: number,
}[],
}
```
### /v3/kv/deleterange
Request:
```ts
type DeleteRangeRequest = {
key: string,
range_end?: string,
}
```
Other parameters are not supported: prev_kv.
Response:
```ts
type DeleteRangeResponse = {
header: { revision: number },
// number of deleted keys
deleted: number,
}
```
### /v3/lease/grant
Request:
```ts
type LeaseGrantRequest = {
ID?: string,
TTL: number,
}
```
Response:
```ts
type LeaseGrantResponse = {
header: { revision: number },
ID: string,
TTL: number,
}
```
### /v3/lease/keepalive
Request:
```ts
type LeaseKeepaliveRequest = {
ID: string,
}
```
Response:
```ts
type LeaseKeepaliveResponse = {
result: {
header: { revision: number },
ID: string,
TTL: number,
}
}
```
### /v3/lease/revoke or /v3/kv/lease/revoke
Request:
```ts
type LeaseRevokeRequest = {
ID: string,
}
```
Response:
```ts
type LeaseRevokeResponse = {
header: { revision: number },
}
```
### Websocket-based watch APIs
Client-to-server message format:
```ts
type ClientMessage =
{ create_request: {
key: string,
range_end?: string,
start_revision?: number,
watch_id?: string,
} }
| { cancel_request: {
watch_id: string,
} }
| { progress_request: {} }
```
Server-to-client message format:
```ts
type ServerMessage = {
result: {
header: { revision: number },
watch_id: string,
created?: boolean,
canceled?: boolean,
compact_revision?: number,
events?: {
type: 'PUT'|'DELETE',
kv: {
key: string,
value: string,
lease?: string,
mod_revision: number,
},
}[],
}
} | { error: 'bad-json' } | { error: 'empty-message' }
```
### HTTP Error Codes
- 400 for invalid requests
- 404 for unsupported API / URL not found
- 405 for non-POST request method
- 501 for unsupported API feature - non-directory range queries and so on
- 502 for server is stopping
- 503 for quorum-related errors - quorum not available and so on

View File

@ -4,6 +4,22 @@ const fsp = require('fs').promises;
const http = require('http');
const https = require('https');
const help_text = `CLI for AntiEtcd
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
Usage:
anticli.js [OPTIONS] put <key> [<value>]
anticli.js [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]
anticli.js [OPTIONS] del <key> [-p|--prefix]
Options:
[--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379]
[--cert cert.pem] [--key key.pem] [--timeout 1000]
`;
class AntiEtcdCli
{
static parse(args)
@ -15,15 +31,7 @@ class AntiEtcdCli
const arg = args[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
if (arg === '-h' || arg === '--help')
{
process.stderr.write(
'USAGE:\n'+
' anticli.js [OPTIONS] put <key> [<value>]\n'+
' anticli.js [OPTIONS] get <key> [-p|--prefix] [-v|--print-value-only] [-k|--keys-only]\n'+
' anticli.js [OPTIONS] del <key> [-p|--prefix]\n'+
'OPTIONS:\n'+
' [--endpoints|-e http://node1:2379,http://node2:2379,http://node3:2379]\n'+
' [--timeout 1000]'
);
process.stderr.write(help_text);
process.exit();
}
else if (arg == '-e' || arg == '--endpoints')
@ -71,6 +79,13 @@ class AntiEtcdCli
{
this.options.endpoints = [ 'http://localhost:2379' ];
}
if (this.options.cert && this.options.key)
{
this.tls = {
key: await fsp.readFile(this.options.key),
cert: await fsp.readFile(this.options.cert),
};
}
if (cmd[0] == 'get')
{
await this.get(cmd.slice(1));
@ -151,7 +166,7 @@ class AntiEtcdCli
for (const url of this.options.endpoints)
{
const cur_url = url.replace(/\/+$/, '')+path;
const res = await POST(cur_url, body, this.options.timeout||1000);
const res = await POST(cur_url, this.tls||{}, body, this.options.timeout||1000);
if (res.json)
{
if (res.json.error)
@ -180,7 +195,7 @@ class AntiEtcdCli
}
}
function POST(url, body, timeout)
function POST(url, options, body, timeout)
{
return new Promise(ok =>
{
@ -195,7 +210,7 @@ function POST(url, body, timeout)
let req = (url.substr(0, 6).toLowerCase() == 'https://' ? https : http).request(url, { method: 'POST', headers: {
'Content-Type': 'application/json',
'Content-Length': body_text.length,
} }, (res) =>
}, timeout, ...options }, (res) =>
{
if (!req)
{

View File

@ -1,10 +1,14 @@
const ws = require('ws');
const TinyRaft = require('./tinyraft.js');
const TinyRaft = require('tinyraft');
const { runCallbacks, RequestError } = require('./common.js');
const LEADER_MISMATCH = 'raft leader/term mismatch';
const LEADER_ONLY = 1;
const NO_WAIT_QUORUM = 2;
const READ_FROM_FOLLOWER = 4;
class AntiCluster
{
constructor(antietcd)
@ -48,7 +52,7 @@ class AntiCluster
if (node_id != this.cfg.node_id && this.cfg.cluster[node_id] &&
(!this.cluster_connections[node_id] || !this.antietcd.clients[this.cluster_connections[node_id]]))
{
const socket = new ws.WebSocket(this.cfg.cluster[node_id].replace(/^http/, 'ws'));
const socket = new ws.WebSocket(this.cfg.cluster[node_id].replace(/^http/, 'ws'), this.antietcd.tls);
const client_id = this.antietcd._startWebsocket(socket, () => setTimeout(() => this.connectToNode(node_id), this.cfg.reconnect_interval||1000));
this.cluster_connections[node_id] = client_id;
socket.on('open', () =>
@ -337,25 +341,33 @@ class AntiCluster
{
return null;
}
if (leaderonly && this.raft.state != TinyRaft.LEADER)
if (leaderonly == LEADER_ONLY && this.raft.state != TinyRaft.LEADER)
{
throw new RequestError(503, 'Not leader');
}
if (this.raft.state == TinyRaft.CANDIDATE)
if (leaderonly == NO_WAIT_QUORUM && this.raft.state == TinyRaft.CANDIDATE)
{
throw new RequestError(503, 'Quorum not available');
}
else if (this.raft.state == TinyRaft.FOLLOWER &&
(!this.cfg.stale_read || this._isWrite(path, data)))
if (!this.synced)
{
// Wait for quorum / initial sync with timeout
await new Promise((ok, no) =>
{
this.wait_sync.push(ok);
setTimeout(() =>
{
this.wait_sync = this.wait_sync.filter(cb => cb != ok);
no(new RequestError(503, 'Quorum not available'));
}, this.cfg.wait_quorum_timeout||30000);
});
}
if (this.raft.state == TinyRaft.FOLLOWER &&
(this._isWrite(path, data) || !this.cfg.stale_read && !(leaderonly & READ_FROM_FOLLOWER)))
{
// Forward to leader
return await this._forwardToLeader(path, data);
}
else if (!this.synced)
{
// Wait for initial sync for read-only requests
await new Promise(ok => this.wait_sync.push(ok));
}
return null;
}
@ -503,4 +515,8 @@ class AntiCluster
}
}
AntiCluster.LEADER_ONLY = LEADER_ONLY;
AntiCluster.NO_WAIT_QUORUM = NO_WAIT_QUORUM;
AntiCluster.READ_FROM_FOLLOWER = READ_FROM_FOLLOWER;
module.exports = AntiCluster;

View File

@ -1,18 +1,17 @@
#!/usr/bin/env node
const AntiEtcd = require('./antietcd.js');
const vitastor_persist_filter = require('./vitastor_persist_filter.js');
const help_text = `Miniature etcd replacement based on TinyRaft
(c) Vitaliy Filippov, 2024
License: Mozilla Public License 2.0
License: Mozilla Public License 2.0 or Vitastor Network Public License 1.1
Usage:
${process.argv[0]} ${process.argv[1]}
[--cert ssl.crt] [--key ssl.key] [--port 12379]
[--data data.gz] [--vitastor-persist-filter /vitastor] [--no-persist-filter] [--persist_interval 500]
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381]
${process.argv[0]} ${process.argv[1]} \
[--cert ssl.crt] [--key ssl.key] [--port 12379] \
[--data data.gz] [--persist-filter ./filter.js] [--persist_interval 500] \
[--node_id node1 --cluster_key abcdef --cluster node1=http://localhost:12379,node2=http://localhost:12380,node3=http://localhost:12381] \
[other options]
Supported etcd REST APIs:
@ -27,23 +26,30 @@ HTTP:
--port 2379
Listen port
--cert <filename>
--cert <cert>
Use TLS with this certificate file (PEM format)
--key <filename>
--key <key>
Use TLS with this key file (PEM format)
--ca <ca>
Use trusted root certificates from this file.
Specify <ca> = <cert> if your certificate is self-signed.
--client_cert_auth 1
Require TLS client certificates signed by <ca> or by default CA to connect.
--ws_keepalive_interval 30000
Client websocket ping (keepalive) interval in milliseconds
Persistence:
--data <filename>
Use <filename> to store persistent data
Store persistent data in <filename>
--persist_interval <milliseconds>
Persist data on disk after this interval, not immediately
--no_persist_filter
Store all data
--vitastor_persist_filter <prefix>
Store only data required for Vitastor with prefix <prefix> on disk
Persist data on disk after this interval, not immediately after change
--persist_filter ./filter.js
Use persistence filter from ./filter.js (or a module).
Persistence filter is a function(cfg) returning function(key, value) ran
for every change and returning a new value or undefined to skip persistence.
--compact_revisions 1000
Number of previous revisions to keep deletion information in memory
Clustering:
@ -57,10 +63,12 @@ Clustering:
Raft election timeout
--heartbeat_timeout 1000
Raft leader heartbeat timeout
--wait_quorum_timeout 30000
Timeout for requests to wait for quorum to come up
--leader_priority <number>
Raft leader priority for this node (optional)
--stale_read 0|1
Allow to serve reads from followers
--stale_read 1
Allow to serve reads from followers. Specify 0 to disallow
--reconnect_interval 1000
Unavailable peer connection retry interval
--dump_timeout 5000
@ -71,38 +79,31 @@ Clustering:
Timeout for forwarding requests from follower to leader in milliseconds
--replication_timeout 1000
Timeout for replicating requests from leader to follower in milliseconds
--compact_revisions 1000
Number of previous revisions to keep deletion information in memory
--compact_timeout 1000
Timeout for compaction requests from leader to follower in milliseconds
`;
function parse()
{
const options = {
persist_filter: vitastor_persist_filter('/vitastor'),
};
const options = { stale_read: 1 };
for (let i = 2; i < process.argv.length; i++)
{
const arg = process.argv[i].toLowerCase().replace(/^--(.+)$/, (m, m1) => '--'+m1.replace(/-/g, '_'));
if (arg === '-h' || arg === '--help')
{
console.error(help_text.trim());
process.stderr.write(help_text);
process.exit();
}
else if (arg == '--no_persist_filter')
{
options['persist_filter'] = null;
}
else if (arg == '--vitastor_persist_filter')
{
options['persist_filter'] = vitastor_persist_filter(process.argv[++i]||'');
}
else if (arg.substr(0, 2) == '--' && arg != '--persist_filter')
else if (arg.substr(0, 2) == '--')
{
options[arg.substr(2)] = process.argv[++i];
}
}
options['stale_read'] = options['stale_read'] === '1' || options['stale_read'] === 'yes' || options['stale_read'] === 'true';
if (options['persist_filter'])
{
options['persist_filter'] = require(options['persist_filter'])(options);
}
return options;
}

155
antietcd.d.ts vendored Normal file
View File

@ -0,0 +1,155 @@
import type { EventEmitter } from 'events';
import type { TinyRaftEvents } from './tinyraft';
export type AntiEtcdEvents = {
raftchange: TinyRaftEvents['change'],
};
export class AntiEtcd extends EventEmitter<AntiEtcdEvents>
{
constructor(cfg: object);
start(): Promise<void>;
stop(): Promise<void>;
api(path: 'kv_txn'|'kv_range'|'kv_put'|'kv_deleterange'|'lease_grant'|'lease_revoke'|'lease_keepalive', params: object): Promise<object>;
txn(params: TxnRequest): Promise<TxnResponse>;
range(params: RangeRequest): Promise<RangeResponse>;
put(params: PutRequest): Promise<PutResponse>;
deleterange(params: DeleteRangeRequest): Promise<DeleteRangeResponse>;
lease_grant(params: LeaseGrantRequest): Promise<LeaseGrantResponse>;
lease_revoke(params: LeaseRevokeRequest): Promise<LeaseRevokeResponse>;
lease_keepalive(params: LeaseKeepaliveRequest): Promise<LeaseKeepaliveResponse>;
create_watch(params: WatchCreateRequest, callback: (ServerMessage) => void): Promise<string|number>;
cancel_watch(watch_id: string|number): Promise<void>;
}
export type TxnRequest = {
compare?: (
{ key: string, target: "MOD", mod_revision: number, result?: "LESS" }
| { key: string, target: "CREATE", create_revision: number, result?: "LESS" }
| { key: string, target: "VERSION", version: number, result?: "LESS" }
| { key: string, target: "LEASE", lease: string, result?: "LESS" }
| { key: string, target: "VALUE", value: string }
)[],
success?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
failure?: (
{ request_put: PutRequest }
| { request_range: RangeRequest }
| { request_delete_range: DeleteRangeRequest }
)[],
serializable?: boolean,
};
export type TxnResponse = {
header: { revision: number },
succeeded: boolean,
responses: (
{ response_put: PutResponse }
| { response_range: RangeResponse }
| { response_delete_range: DeleteRangeResponse }
)[],
};
export type PutRequest = {
key: string,
value: string,
lease?: string,
};
export type PutResponse = {
header: { revision: number },
};
export type RangeRequest = {
key: string,
range_end?: string,
keys_only?: boolean,
serializable?: boolean,
};
export type RangeResponse = {
header: { revision: number },
kvs: { key: string }[] | {
key: string,
value: string,
lease?: string,
mod_revision: number,
}[],
};
export type DeleteRangeRequest = {
key: string,
range_end?: string,
};
export type DeleteRangeResponse = {
header: { revision: number },
// number of deleted keys
deleted: number,
};
export type LeaseGrantRequest = {
ID?: string,
TTL: number,
};
export type LeaseGrantResponse = {
header: { revision: number },
ID: string,
TTL: number,
};
export type LeaseKeepaliveRequest = {
ID: string,
};
export type LeaseKeepaliveResponse = {
result: {
header: { revision: number },
ID: string,
TTL: number,
}
};
export type LeaseRevokeRequest = {
ID: string,
};
export type LeaseRevokeResponse = {
header: { revision: number },
};
export type WatchCreateRequest = {
key: string,
range_end?: string,
start_revision?: number,
watch_id?: string|number,
}
export type ClientMessage =
{ create_request: WatchCreateRequest }
| { cancel_request: { watch_id: string } }
| { progress_request: {} };
export type ServerMessage = {
result: {
header: { revision: number },
watch_id: string|number,
created?: boolean,
canceled?: boolean,
compact_revision?: number,
events?: {
type: 'PUT'|'DELETE',
kv: {
key: string,
value: string,
lease?: string,
mod_revision: number,
},
}[],
}
} | { error: 'bad-json' } | { error: 'empty-message' };

View File

@ -29,6 +29,7 @@ class AntiEtcd extends EventEmitter
this.stopped = false;
this.inflight = 0;
this.wait_inflight = [];
this.api_watches = {};
}
async start()
@ -49,7 +50,18 @@ class AntiEtcd extends EventEmitter
}
if (this.cfg.cert)
{
this.tls = { key: await fsp.readFile(this.cfg.key), cert: await fsp.readFile(this.cfg.cert) };
this.tls = {
key: await fsp.readFile(this.cfg.key),
cert: await fsp.readFile(this.cfg.cert),
};
if (this.cfg.ca)
{
this.tls.ca = await fsp.readFile(this.cfg.ca);
}
if (this.cfg.client_cert_auth)
{
this.tls.requestCert = true;
}
this.server = https.createServer(this.tls, (req, res) => this._handleRequest(req, res));
}
else
@ -93,17 +105,34 @@ class AntiEtcd extends EventEmitter
{
res.push(this.persistence.persistChange(msg));
}
if (res.length)
if (res.length == 1)
{
res = await Promise.allSettled(res);
const errors = res.filter(r => r.status == 'rejected');
if (errors.length)
{
for (const e of errors)
{
console.error(e.reason);
await res[0];
}
throw errors[0].reason;
else if (res.length > 0)
{
let done = 0;
await new Promise((allOk, allNo) =>
{
res.map(promise => promise.then(res =>
{
if ((++done) == res.length)
allOk();
}).catch(e =>
{
console.error(e);
allNo(e);
}));
});
}
if (!this.cluster)
{
// Run deletion compaction without followers
const mod_revision = this.antietcd.etctree.mod_revision;
if (mod_revision - this.antietcd.etctree.compact_revision > (this.cfg.compact_revisions||1000)*2)
{
const revision = mod_revision - (this.cfg.compact_revisions||1000);
this.antietcd.etctree.compact(revision);
}
}
}
@ -263,6 +292,14 @@ class AntiEtcd extends EventEmitter
{
data.leaderonly = true;
}
if (requestUrl.searchParams.get('serializable'))
{
data.serializable = true;
}
if (requestUrl.searchParams.get('nowaitquorum'))
{
data.nowaitquorum = true;
}
try
{
if (requestUrl.pathname.substr(0, 4) == '/v3/')
@ -306,7 +343,13 @@ class AntiEtcd extends EventEmitter
}
if (path !== 'dump' && this.cluster)
{
const res = await this.cluster.checkRaftState(path, data.leaderonly, data);
const res = await this.cluster.checkRaftState(
path,
(data.leaderonly ? AntiCluster.LEADER_ONLY : 0) |
(data.serializable ? AntiCluster.READ_FROM_FOLLOWER : 0) |
(data.nowaitquorum ? AntiCluster.NO_WAIT_QUORUM : 0),
data
);
if (res)
{
return res;
@ -361,6 +404,30 @@ class AntiEtcd extends EventEmitter
return await this.api('lease_keepalive', params);
}
// public watch API
async create_watch(params, callback)
{
const watch = this.etctree.api_create_watch({ ...params, watch_id: null }, callback);
if (!watch.created)
{
throw new RequestError(400, 'Requested watch revision is compacted', { compact_revision: watch.compact_revision });
}
const watch_id = params.watch_id || watch.watch_id;
this.api_watches[watch_id] = watch.watch_id;
return watch_id;
}
async cancel_watch(watch_id)
{
const mapped_id = this.api_watches[watch_id];
if (!mapped_id)
{
throw new RequestError(400, 'Watch not found');
}
this.etctree.api_cancel_watch({ watch_id: mapped_id });
delete this.api_watches[watch_id];
}
// internal handlers
async _handle_kv_txn(data)
{

View File

@ -1,9 +1,10 @@
class RequestError
{
constructor(code, text)
constructor(code, text, details)
{
this.code = code;
this.message = text;
this.details = details;
}
}

View File

@ -1,5 +1,6 @@
const crypto = require('crypto');
const stableStringify = require('./stable-stringify.js');
const { RequestError } = require('./common.js');
/*type TreeNode = {
value?: any,
@ -85,9 +86,14 @@ class EtcTree
check_value = cur && cur.lease;
ref_value = chk.lease;
}
else if (chk.target === 'VALUE')
{
check_value = cur && cur.value;
ref_value = chk.value;
}
else
{
throw new Error('Unsupported comparison target: '+chk.target);
throw new RequestError(501, 'Unsupported comparison target: '+chk.target);
}
if (chk.result === 'LESS')
{
@ -95,7 +101,7 @@ class EtcTree
}
else if (chk.result)
{
throw new Error('Unsupported comparison result: '+chk.result);
throw new RequestError(501, 'Unsupported comparison result: '+chk.result);
}
return check_value == ref_value;
}
@ -113,7 +119,7 @@ class EtcTree
if (end != null && (key[key.length-1] != '/' || end[end.length-1] != '0' ||
end.substr(0, end.length-1) !== key.substr(0, key.length-1)))
{
throw new Error('Non-directory range queries are unsupported');
throw new RequestError(501, 'Non-directory range queries are unsupported');
}
const parts = this._key_parts(key);
return { parts, all: end != null };
@ -380,7 +386,7 @@ class EtcTree
const id = req.ID;
if (!this.leases[id])
{
throw new Error('unknown lease');
throw new RequestError(400, 'unknown lease');
}
const lease = this.leases[id];
if (lease.timer_id)
@ -430,7 +436,7 @@ class EtcTree
{
if (!this.leases[id])
{
throw new Error('unknown lease');
throw new RequestError(400, 'unknown lease');
}
for (const key in this.leases[id].keys)
{
@ -446,7 +452,7 @@ class EtcTree
{
if (no_throw)
return null;
throw new Error('unknown lease');
throw new RequestError(400, 'unknown lease');
}
this.mod_revision++;
this._sync_revoke_lease(req.ID, notifications, this.mod_revision);
@ -540,7 +546,7 @@ class EtcTree
let watch_id = req.watch_id;
if (watch_id instanceof Object)
{
throw new Error('invalid watch_id');
throw new RequestError(400, 'invalid watch_id');
}
if (!watch_id)
{
@ -735,7 +741,7 @@ class EtcTree
{
if (!this.leases[request_put.lease])
{
throw new Error('unknown lease: '+request_put.lease);
throw new RequestError(400, 'unknown lease: '+request_put.lease);
}
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;

View File

@ -11,8 +11,29 @@
"type": "git",
"url": "https://git.yourcmc.ru/vitalif/antietcd"
},
"homepage": "https://git.yourcmc.ru/vitalif/antietcd",
"bugs": {
"url": "https://git.yourcmc.ru/vitalif/antietcd/issues"
},
"files": [
"anticli.js",
"anticluster.js",
"antietcd-app.js",
"antietcd.js",
"antietcd.d.ts",
"antietcd.js",
"antipersistence.js",
"common.js",
"etctree.js",
"stable-stringify.js",
"README.md"
],
"keywords": [
"raft"
"raft",
"etcd",
"quorum",
"leader",
"election"
],
"author": "Vitaliy Filippov",
"license": "MPL-2.0",
@ -24,6 +45,7 @@
"eslint-plugin-node": "^11.1.0"
},
"dependencies": {
"tinyraft": "^1.0.1",
"ws": "^8.17.0"
},
"bin": {

View File

@ -1,5 +1,6 @@
function vitastor_persist_filter(prefix)
function vitastor_persist_filter(cfg)
{
const prefix = cfg.vitastor_prefix || '/vitastor';
return (key, value) =>
{
if (key.substr(0, prefix.length+'/osd/stats/'.length) == prefix+'/osd/stats/')