Split txn_action

master
Vitaliy Filippov 2024-05-04 12:03:34 +03:00
parent 857cf668f2
commit 74a77a3974
2 changed files with 96 additions and 82 deletions

View File

@ -276,7 +276,9 @@ class AntiEtcd
unsubscribeClient(client_id)
{
if (!this.clients[client_id])
{
return;
}
for (const watch_id in this.clients[client_id].watches)
{
const mapped_id = this.clients[client_id].watches[watch_id];

View File

@ -377,7 +377,7 @@ class EtcTree
const notifications = [];
for (const key in this.leases[req.ID].keys)
{
this.txn_action({ request_delete_range: { key } }, next_revision, notifications);
this._delete_range({ key }, next_revision, notifications);
}
this.notify(notifications);
return { header: { revision: this.mod_revision } };
@ -483,20 +483,6 @@ class EtcTree
return { canceled: true };
}
api_txn({ compare, success, failure })
{
const failed = (compare || []).filter(chk => !this.check(chk)).length > 0;
const responses = [];
const notifications = [];
const next_revision = this.mod_revision + 1;
for (const req of (failed ? failure : success) || [])
{
responses.push(this.txn_action(req, next_revision, notifications));
}
this.notify(notifications);
return { header: { revision: this.mod_revision }, succeeded: !failed, responses };
}
notify(notifications)
{
if (!notifications.length)
@ -525,86 +511,112 @@ class EtcTree
}
}
txn_action(req, cur_revision, notifications)
api_txn({ compare, success, failure })
{
const failed = (compare || []).filter(chk => !this.check(chk)).length > 0;
const responses = [];
const notifications = [];
const next_revision = this.mod_revision + 1;
for (const req of (failed ? failure : success) || [])
{
responses.push(this._txn_action(req, next_revision, notifications));
}
this.notify(notifications);
return { header: { revision: this.mod_revision }, succeeded: !failed, responses };
}
_txn_action(req, cur_revision, notifications)
{
if (req.request_range || req.requestRange)
{
const request_range = req.request_range || req.requestRange;
// FIXME: limit, revision(-), sort_order, sort_target, serializable(-),
// count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision
const { parts, all } = this.get_range(request_range);
const { cur } = this.get_subtree(parts, false, false);
const kvs = [];
if (cur)
{
this.get_all(kvs, cur, all, parts.join('/') || null, request_range);
}
return { response_range: { kvs } };
return { response_range: this._range(req.request_range || req.requestRange) };
}
else if (req.request_put || req.requestPut)
{
const request_put = req.request_put || req.requestPut;
// FIXME: prev_kv, ignore_value(?), ignore_lease(?)
const parts = this.key_parts(this.de64(request_put.key));
const key = parts.join('/');
const value = this.de64(request_put.value);
const { cur, watchers } = this.get_subtree(parts, true, true);
if (cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
if (!eq(cur.value, value) || cur.lease != request_put.lease)
{
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[key];
}
if (request_put.lease)
{
if (!this.leases[request_put.lease])
{
throw new Error('unknown lease: '+request_put.lease);
}
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;
}
else if (cur.lease)
{
cur.lease = null;
}
this.mod_revision = cur_revision;
cur.version = (cur.version||0) + 1;
cur.mod_revision = cur_revision;
if (cur.value == null)
{
cur.create_revision = cur_revision;
}
cur.value = value;
const notify = { watchers, key: this.b64(key), value: this.b64(value), mod_revision: cur.mod_revision };
if (cur.lease)
{
notify.lease = cur.lease;
}
notifications.push(notify);
}
return { response_put: {} };
return { response_put: this._put(req.request_put || req.requestPut, cur_revision, notifications) }
}
else if (req.request_delete_range || req.requestDeleteRange)
{
const request_delete_range = req.request_delete_range || req.requestDeleteRange;
// FIXME: prev_kv
const { parts, all } = this.get_range(request_delete_range);
const { cur, watchers } = this.get_subtree(parts, false, true);
const prevcount = notifications.length;
if (cur)
{
this.delete_all(notifications, watchers, cur, all, parts.join('/') || null, cur_revision);
}
return { response_delete_range: { deleted: notifications.length-prevcount } };
return { response_delete_range: this._delete_range(req.request_delete_range || req.requestDeleteRange, cur_revision, notifications) };
}
return {};
}
_range(request_range)
{
// FIXME: limit, revision(-), sort_order, sort_target, serializable(-),
// count_only, min_mod_revision, max_mod_revision, min_create_revision, max_create_revision
const { parts, all } = this.get_range(request_range);
const { cur } = this.get_subtree(parts, false, false);
const kvs = [];
if (cur)
{
this.get_all(kvs, cur, all, parts.join('/') || null, request_range);
}
return { kvs };
}
_put(request_put, cur_revision, notifications)
{
// FIXME: prev_kv, ignore_value(?), ignore_lease(?)
const parts = this.key_parts(this.de64(request_put.key));
const key = parts.join('/');
const value = this.de64(request_put.value);
const { cur, watchers } = this.get_subtree(parts, true, true);
if (cur.key_watchers)
{
watchers.push.apply(watchers, cur.key_watchers);
}
if (!eq(cur.value, value) || cur.lease != request_put.lease)
{
if (cur.lease && this.leases[cur.lease])
{
delete this.leases[cur.lease].keys[key];
}
if (request_put.lease)
{
if (!this.leases[request_put.lease])
{
throw new Error('unknown lease: '+request_put.lease);
}
cur.lease = request_put.lease;
this.leases[request_put.lease].keys[key] = true;
}
else if (cur.lease)
{
cur.lease = null;
}
this.mod_revision = cur_revision;
cur.version = (cur.version||0) + 1;
cur.mod_revision = cur_revision;
if (cur.value == null)
{
cur.create_revision = cur_revision;
}
cur.value = value;
const notify = { watchers, key: this.b64(key), value: this.b64(value), mod_revision: cur.mod_revision };
if (cur.lease)
{
notify.lease = cur.lease;
}
notifications.push(notify);
}
return {};
}
_delete_range(request_delete_range, cur_revision, notifications)
{
// FIXME: prev_kv
const { parts, all } = this.get_range(request_delete_range);
const { cur, watchers } = this.get_subtree(parts, false, true);
const prevcount = notifications.length;
if (cur)
{
this.delete_all(notifications, watchers, cur, all, parts.join('/') || null, cur_revision);
}
return { deleted: notifications.length-prevcount };
}
get_all(kvs, cur, all, prefix, req)
{
if (req.limit && kvs.length > req.limit)