Compare commits

...

15 Commits

Author SHA1 Message Date
Taylor McKinnon 071cbd03d0 update Dockerfile for new warpscript paths 2021-09-02 09:45:34 -07:00
Taylor McKinnon a09fce2467 cleanup and reorganize warpscript 2021-09-02 09:45:27 -07:00
Taylor McKinnon d0a9a32cb1 remove events from metric caculation 2021-08-20 12:33:28 -07:00
Taylor McKinnon 5dd3a7f68b fix snapshots 2021-08-20 12:31:13 -07:00
Taylor McKinnon 41935ac187 remove old tasks 2021-08-20 12:30:44 -07:00
Taylor McKinnon 84366f98ad make snapshots include repair checkpoints 2021-08-20 11:45:06 -07:00
Taylor McKinnon 802434f312 remove uneeded repair task 2021-08-20 11:39:39 -07:00
Taylor McKinnon 74ccd4e8ca remove uneeded checkpoint task 2021-08-20 11:39:14 -07:00
Taylor McKinnon dec8f68ae3 implement checkpoint calculation 2021-08-20 11:35:21 -07:00
Taylor McKinnon e13e428dec allow mixed classes and labels in warp10 ingest 2021-08-20 11:34:54 -07:00
Taylor McKinnon 034baca47a convert ingestShards to use new iterator approach 2021-08-20 10:40:38 -07:00
Taylor McKinnon 3da9b8642e convert cache client wrapper 2021-08-20 10:19:18 -07:00
Taylor McKinnon 26b7dc5284 convert memory backend 2021-08-20 10:18:33 -07:00
Taylor McKinnon 973e99f1d5 convert addToShard to use hash 2021-08-20 09:12:35 -07:00
Taylor McKinnon b4741d7382 extract stream conversion from migrate 2021-08-20 09:12:01 -07:00
35 changed files with 239 additions and 648 deletions

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'CreateCheckpoint',
});
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting checkpoint creation'))
.then(() => task.start())
.then(() => logger.info('Checkpoint creation started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Repair',
});
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting Repair daemon'))
.then(() => task.start())
.then(() => logger.info('Repair started'));

View File

@ -30,7 +30,8 @@ ENV SENSISION_PORT 8082
ENV standalone.host 0.0.0.0
ENV standalone.port 4802
ENV standalone.home /opt/warp10
ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warpscript.repository.directory /usr/local/share/warpscript/scality
ENV runner.root /usr/local/share/warpscript/runners
ENV warp.token.file /static.tokens
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'

View File

@ -1,3 +1,4 @@
/* eslint-disable no-restricted-syntax */
const schema = require('../schema');
const constants = require('../../constants');
@ -48,31 +49,27 @@ class MemoryCache {
}
async addToShard(shard, event) {
const metricKey = schema.getUtapiMetricKey(this._prefix, event);
this._data[metricKey] = event;
if (this._shards[shard]) {
this._shards[shard].push(metricKey);
this._shards[shard][event.uuid] = event;
} else {
this._shards[shard] = [metricKey];
this._shards[shard] = { [event.uuid]: event };
}
return true;
}
async getKeysInShard(shard) {
return this._shards[shard] || [];
return this._shards[shard] || Object.keys({});
}
async fetchShard(shard) {
if (this._shards[shard]) {
return this._shards[shard].map(key => this._data[key]);
async* fetchShard(shard) {
const _shard = this._shards[shard] || {};
for (const [key, value] of Object.entries(_shard)) {
yield key;
yield value;
}
return [];
}
async deleteShardAndKeys(shard) {
(this._shards[shard] || []).forEach(key => {
delete this._data[key];
});
delete this._shards[shard];
return true;
}
@ -82,7 +79,7 @@ class MemoryCache {
}
async shardExists(shard) {
return this._shards[shard.toString()] !== undefined;
return this._shards[shard] !== undefined;
}
async updateCounters(metric) {

View File

@ -1,7 +1,8 @@
/* eslint-disable no-restricted-syntax */
const RedisClient = require('../../redis');
const schema = require('../schema');
const { LoggerContext } = require('../../utils');
const { LoggerContext, streamToAsyncIter } = require('../../utils');
const constants = require('../../constants');
const moduleLogger = new LoggerContext({
@ -60,39 +61,34 @@ class RedisCache {
const logger = moduleLogger.with({ method: 'addToShard' });
return logger
.logAsyncError(async () => {
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
logger.debug('adding metric to shard', { metricKey, shardKey });
logger.debug('adding metric to shard', { uuid: metric.uuid, shardKey });
const [setResults, saddResults] = await this._redis
.call(redis => redis
.multi([
['set', metricKey, JSON.stringify(metric.getValue())],
['sadd', shardKey, metricKey],
['hset', shardKey, metric.uuid, JSON.stringify(metric.getValue())],
['sadd', shardMasterKey, shardKey],
])
.exec());
let success = true;
if (setResults[1] !== 'OK') {
if (setResults[0] !== 1) {
moduleLogger.error('failed to set metric key', {
metricKey,
uuid: metric.uuid,
shardKey,
res: setResults[1],
res: setResults[0],
});
success = false;
return false;
}
if (saddResults[1] !== 1) {
moduleLogger.error('metric key already present in shard', {
metricKey,
moduleLogger.trace('shard key already present in master', {
shardKey,
res: saddResults[1],
});
success = false;
}
return success;
return true;
}, 'error during redis command');
}
@ -105,16 +101,22 @@ class RedisCache {
}, 'error while fetching shard keys', { shard });
}
async fetchShard(shard) {
return moduleLogger
async* fetchShard(shard) {
moduleLogger
.with({ method: 'fetchShard' })
.logAsyncError(async () => {
const keys = await this.getKeysInShard(shard);
if (!keys.length) {
return [];
.debug('fetching metrics from shard', { shard });
const shardKey = schema.getShardKey(this._prefix, shard);
const redis = this._redis._redis;
// ioredis returns the key and value as separate items
// so we need to filter and only yield the values
let isValue = false;
for await (const metric of streamToAsyncIter(redis.hscanStream(shardKey, { count: 1000 }))) {
if (isValue) {
yield JSON.parse(metric);
}
isValue = !isValue;
}
return this._redis.call(redis => redis.mget(...keys));
}, 'error while fetching shard data', { shard });
}
async deleteShardAndKeys(shard) {
@ -123,10 +125,9 @@ class RedisCache {
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
const keys = await this.getKeysInShard(shard);
return this._redis.call(
redis => redis.multi([
['del', shardKey, ...keys],
['del', shardKey],
['srem', shardMasterKey, shardKey],
]).exec(),
);

View File

@ -30,7 +30,10 @@ class CacheClient {
return true;
}
async getMetricsForShard(shard) {
// backend returns an async iterator
// no `async` keyword so it doesn't wrap it in a promise
// eslint-disable-next-line require-yield
getMetricsForShard(shard) {
return this._cacheBackend.fetchShard(shard);
}

View File

@ -1,36 +0,0 @@
const BaseTask = require('./BaseTask');
const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants');
const { LoggerContext } = require('../utils');
const logger = new LoggerContext({
module: 'CreateCheckpoint',
});
class CreateCheckpoint extends BaseTask {
constructor(options) {
super(options);
this._defaultSchedule = config.checkpointSchedule;
this._defaultLag = checkpointLagSecs;
}
async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/createCheckpoint',
};
return warp10.exec(params);
});
if (status.result[0]) {
logger.info(`created ${status.result[0] || 0} checkpoints`);
}
}
}
module.exports = CreateCheckpoint;

View File

@ -1,11 +1,13 @@
/* eslint-disable no-restricted-globals */
/* eslint-disable no-restricted-syntax */
const assert = require('assert');
const async = require('async');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const { UtapiRecord } = require('../models');
const config = require('../config');
const { checkpointLagSecs } = require('../constants');
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
} = require('../utils');
const logger = new LoggerContext({
@ -14,6 +16,43 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
function orZero(value) {
return value || 0;
}
function _updateCheckpoint(checkpoint, metric) {
const ops = checkpoint.operations || {};
return {
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
};
}
function _checkpointFactory(labels) {
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
let oldest = NaN;
let newest = NaN;
return {
update: metric => {
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
labels
.filter(label => !!metric[label])
.forEach(label => {
const value = metric[label];
const checkpoint = checkpoints[label][value] || {};
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
});
},
checkpoints: () => (checkpoints),
oldest: () => (oldest),
newest: () => (newest),
};
}
class IngestShardTask extends BaseTask {
constructor(options) {
super(options);
@ -22,17 +61,6 @@ class IngestShardTask extends BaseTask {
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
_hydrateEvent(data, stripTimestamp = false) {
const event = JSON.parse(data);
if (this._stripEventUUID) {
delete event.uuid;
}
if (stripTimestamp) {
delete event.timestamp;
}
return new UtapiMetric(event);
}
async _execute(timestamp) {
const endShard = shardFromTimestamp(timestamp);
logger.debug('ingesting shards', { endShard });
@ -50,43 +78,50 @@ class IngestShardTask extends BaseTask {
await async.eachLimit(toIngest, 10,
async shard => {
if (await this._cache.shardExists(shard)) {
const metrics = await this._cache.getMetricsForShard(shard);
if (metrics.length > 0) {
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
const factory = _checkpointFactory(['bucket', 'account']);
for await (const metric of this._cache.getMetricsForShard(shard)) {
factory.update(metric);
}
const shardAge = now() - shard;
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
const metricClass = areSlowEvents ? 'utapi.repair.checkpoint' : 'utapi.checkpoint';
if (areSlowEvents) {
logger.info('Detected slow records, ingesting as repair');
}
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
const checkpoints = [];
const checkpointTimestamp = areSlowEvents ? now() : shard;
records.sort((a, b) => a.timestamp - b.timestamp);
const clock = new InterpolatedClock();
records.forEach(r => {
r.timestamp = clock.getTs(r.timestamp);
Object.entries(factory.checkpoints())
.forEach(([level, chkpts]) => {
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
const data = new UtapiRecord({
...checkpoint,
timestamp: checkpointTimestamp,
});
checkpoints.push({
className: metricClass,
valueType: warp10RecordType,
labels: {
origin: config.nodeId,
[eventFieldsToWarp10[level]]: resource,
},
data,
});
});
});
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(
{
className: metricClass,
labels: { origin: config.nodeId },
}, records,
);
return warp10.ingest(checkpoints);
});
assert.strictEqual(status, records.length);
assert.strictEqual(status, checkpoints.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
} else {
logger.debug('No events found in shard, cleaning up');
}
} else {
logger.warn('shard does not exist', { shard });
}

View File

@ -17,6 +17,7 @@ const {
now,
convertTimestamp,
comprehend,
streamToAsyncIter,
} = require('../utils');
const REDIS_CHUNKSIZE = 50;
@ -54,34 +55,9 @@ class MigrateTask extends BaseTask {
return parseInt(value, 10);
}
static async* _iterStream(stream) {
let finished = false;
let data;
stream.on('end', () => { finished = true; });
stream.pause();
while (!finished) {
data = await new Promise(resolve => {
const _resolve = jsutil.once(resolve);
const end = () => _resolve([]);
stream.once('end', end);
stream.once('data', _data => {
stream.pause();
stream.off('end', end);
_resolve(_data);
});
stream.resume();
});
for (const item of data) {
yield item;
}
}
}
async* _iterResources(level) {
const redis = this._redis._redis;
const keys = MigrateTask._iterStream(redis.scanStream({
const keys = streamToAsyncIter(redis.scanStream({
count: 100,
match: `s3:${level}:*:storageUtilized`,
}));

View File

@ -1,37 +0,0 @@
const BaseTask = require('./BaseTask');
const config = require('../config');
const { LoggerContext } = require('../utils');
const { repairLagSecs, indexedEventFields } = require('../constants');
const logger = new LoggerContext({
module: 'Repair',
});
class RepairTask extends BaseTask {
constructor(options) {
super(options);
this._defaultSchedule = config.repairSchedule;
this._defaultLag = repairLagSecs;
}
async _execute(timestamp) {
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
const status = await this.withWarp10(warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/repairRecords',
};
return warp10.exec(params);
});
if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`);
}
}
}
module.exports = RepairTask;

View File

@ -1,8 +1,6 @@
const BaseTask = require('./BaseTask');
const IngestShard = require('./IngestShard');
const CreateCheckpoint = require('./CreateCheckpoint');
const CreateSnapshot = require('./CreateSnapshot');
const RepairTask = require('./Repair');
const ReindexTask = require('./Reindex');
const MigrateTask = require('./Migrate');
const MonitorDiskUsage = require('./DiskUsage');
@ -11,9 +9,7 @@ const ManualAdjust = require('./ManualAdjust');
module.exports = {
IngestShard,
BaseTask,
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
MigrateTask,
MonitorDiskUsage,

View File

@ -3,6 +3,7 @@ const shard = require('./shard');
const timestamp = require('./timestamp');
const func = require('./func');
const disk = require('./disk');
const stream = require('./stream');
module.exports = {
...log,
@ -10,4 +11,5 @@ module.exports = {
...timestamp,
...func,
...disk,
...stream,
};

View File

@ -1,7 +1,8 @@
const config = require('../config');
/**
* Returns a unix style timestamp floored to 10 second resolution
* Returns a unix style timestamp converted to a configurable resolution
* Represents a timespan of interval size ending at the returned value.
* @param {Number} timestamp - Unix timestamp with millisecond/microsecond resolution
* @returns {Number} - Unix timestamp representing beginning of shard
*/
@ -10,7 +11,8 @@ function shardFromTimestamp(timestamp) {
if (timestamp > 1000000000000000) { // handle microsecond resolution
interval = config.ingestionShardSize * 1000000;
}
return timestamp - (timestamp % interval);
return Math.ceil(timestamp / interval) * interval;
}
module.exports = {

38
libV2/utils/stream.js Normal file
View File

@ -0,0 +1,38 @@
/* eslint-disable no-restricted-syntax */
/* eslint-disable no-loop-func */
/* eslint-disable no-await-in-loop */
const { jsutil } = require('arsenal');
async function* streamToAsyncIter(stream) {
let finished = false;
let data;
stream.on('end', () => { finished = true; });
stream.pause();
while (!finished) {
data = await new Promise((resolve, reject) => {
const _resolve = jsutil.once(resolve);
const _reject = jsutil.once(reject);
const end = () => _resolve([]);
stream.once('end', end);
stream.once('error', _reject);
stream.once('data', _data => {
stream.pause();
stream.off('end', end);
stream.off('error', _reject);
_resolve(_data);
});
stream.resume();
});
for (const item of data) {
yield item;
}
}
}
module.exports = {
streamToAsyncIter,
};

View File

@ -38,9 +38,12 @@ class Warp10Client {
}
async ingest(metadata, events) {
let payload;
// If two arguments are provided
if (events !== undefined) {
const { className, valueType, labels } = metadata;
assert.notStrictEqual(className, undefined, 'you must provide a className');
const payload = events.map(
payload = events.map(
ev => this._buildGTSEntry(
className,
valueType || warp10EventType,
@ -48,8 +51,20 @@ class Warp10Client {
ev,
),
);
// If only one is provided
} else {
payload = metadata.map(
ev => this._buildGTSEntry(
ev.className,
ev.valueType || warp10EventType,
ev.labels || {},
ev.data,
),
);
}
const res = await this.update(payload);
return res.count;
}
_buildScriptEntry(params) {

View File

@ -71,9 +71,7 @@
"start": "node server.js",
"test": "mocha --recursive tests/unit",
"start_v2:task:ingest": "ENABLE_UTAPI_V2=1 node bin/ingestShards.js",
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
"start_v2:task:migrate": "ENABLE_UTAPI_V2=1 node bin/migrate.js",
"start_v2:task:disk": "ENABLE_UTAPI_V2=1 node bin/diskUsage.js",

View File

@ -0,0 +1 @@
@utapi/writeCanary

View File

@ -38,7 +38,7 @@
'utapi.snapshot.master' 'master_snapshot_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.checkpoint' 'correction_class' STORE
// Fetch latest master snapshot
$read_token $master_snapshot_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore
@ -54,6 +54,7 @@
".app"
".producer"
".owner"
"origin"
] ->SET 'ignoredLabels' STORE
// Search for available snapshots

View File

@ -87,7 +87,7 @@
$opsResults SWAP $op PUT DROP
%> FOREACH
$results $opsResults 'operations' PUT
$results $opsResults 'operations' PUT DROP
$results ->JSON
// DUP LOGMSG

View File

@ -35,11 +35,9 @@
$operation_info 'node' GET 'nodeID' STORE
$operation_info 'no_reindex' GET true == 'no_reindex' STORE
'utapi.event' 'event_class' STORE
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.checkpoint' 'correction_class' STORE
'utapi.repair.reindex' 'reindex_class' STORE
{} 'snapshots' STORE
@ -128,8 +126,6 @@
'end' $endTimestamp
'start' $startTimestamp 1 +
} FETCH
%> FOREACH
]
@ -153,93 +149,6 @@
] REDUCE
DROP
// 'results: ' $results ->JSON + LOGMSG
// 'loading master checkpoints' LOGMSG
// Load the most recent master checkpoint before our target timestamp from each node
{} 'masterCheckpoints' STORE
{
'token' $read_token
'class' $master_checkpoint_class
'labels' {}
'end' $endTimestamp
'count' 1
}
FETCH
<%
$masterCheckpoints SWAP
DUP LASTTICK SWAP
LABELS 'node' GET
PUT DROP
%> FOREACH
// 'loaded master checkpoints: '
// $masterCheckpoints ->JSON + LOGMSG
// $labels ->JSON LOGMSG
// Find all nodes containing events
'load_events' SECTION
[
$read_token
$event_class
{}
] FINDSETS
DROP SWAP DROP
'node' GET
<% DUP ISNULL %>
<% DROP [] %> IFT
// Load events from each node
<%
'key' STORE
// Get the timestamp of the master checkpoint for the node if it exists
// If no master checkpoint exists for a node then we can infer that no
// snapshots exists for that node as well, and we must start at 0
$masterCheckpoints $key GET
<% DUP TYPEOF 'LONG' != %>
<% DROP -1 %> IFT
'startTimestamp' STORE
{
'token' $read_token
'class' $event_class
'labels' { 'node' $key }
'end' $endTimestamp
'start' $startTimestamp 1 +
} FETCH
<% // Handle multiple GTS
VALUES
<%
@utapi/decodeEvent 'event' STORE
true 'passed' STORE
$labels KEYLIST
<%
'labelKey' STORE
<% $labels $labelKey GET $event $labelKey GET != %>
<%
false 'passed' STORE
BREAK
%> IFT
%> FOREACH
<% $passed %>
<%
$results
'objD' $event 'objD' 0 @util/getDefault @util/sumField
'sizeD' $event 'sizeD' 0 @util/getDefault @util/sumField
'inB' $event 'inB' 0 @util/getDefault @util/sumField
'outB' $event 'outB' 0 @util/getDefault @util/sumField
'ops' GET 'resultOps' STORE
$resultOps $event 'op' GET 1 @util/sumField
DROP
%> IFT
%> FOREACH
%> FOREACH
%> FOREACH
// $results ->JSON LOGMSG
// Find all nodes containing corrections
'load_corrections' SECTION
[

View File

@ -0,0 +1,36 @@
{
'name' 'utapi/writeCanary'
'desc'
<'
Write a canary record containing the current timestamp labeled with the nodeId
'>
'sig' [ [ [ ] [ ] ] ] // Signature
'params' {}
'examples' [
<'
@utapi/writeCanary
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
'nodeId' MACROCONFIG 'nodeId' STORE
NEWGTS
'utapi.canary' RENAME
{ 'nodeId' $nodeId } RELABEL
NOW NaN NaN NaN true ADDVALUE
'writeTokenStatic' UPDATE
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
$macro

View File

@ -1,160 +0,0 @@
{
'name' 'utapi/createCheckPoint'
'desc'
<'
Scans recent events and generates multiple checkpoint GTS based on the value of the provided fields
'>
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
'params' {
// Signature params description
'a' 'Map containing read/write tokens'
'o' 'Map containing operation info'
'c' 'List of created checkpoints'
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
// 'Creating checkpoints' LOGMSG
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
$operation_info 'nodeId' GET 'nodeId' STORE
{ 'node' $nodeId } 'filterLabels' STORE
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
// Grab our passed timestamp and store it as the variable `endTimestamp`
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.event' 'metric_class' STORE
$read_token $master_checkpoint_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master checkpoint
FIRSTTICK
// If we found a checkpoint, increment its timestamp
<% DUP 0 > %>
<% 1 + %> IFT
'startTimestamp' STORE // Grab our starting timestamp from the last checkpoint (0 if no checkpoints)
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
// 'Using ' $endTimestamp TOSTRING + ' as endTimestamp' + LOGMSG
{} 'results' STORE # Create an empty map for results
$fieldsToIndex
<% // For each field create an empty map in results
'field' STORE
$results {} $field PUT DROP
%> FOREACH
// Fetch all events since the last checkpoint
{
'token' $read_token
'class' $metric_class
'labels' $filterLabels
'start' $startTimestamp
'end' $endTimestamp
} FETCH
<% DUP SIZE 0 > %>
<%
<%
<% // Iter over events
'event' STORE // Store the event
$event 4 GET @utapi/decodeEvent 'decoded' STORE
// 'Including event ' $event 0 GET TOSTRING + ' ' + $decoded ->JSON + LOGMSG
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
$decoded 'op' GET 'operationId' STORE
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
<%
'field' STORE // Store the field
$decoded $field GET 'fieldValue' STORE // Store the fields value
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
<%
$fieldResults $fieldValue GET // Grab the existing map for the checkpoint and leave it on the stack
%>
<%
// Push empty checkpoint onto stack
{
'objD' 0
'sizeD' 0
'inB' 0
'outB' 0
'ops' {}
} 'fieldValueResults' STORE
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
$fieldValueResults // Leave it on the stack
%> IFTE
// Consumes a map off the stack summing the specified field and the passed value
// Leaves the modified map on the stack
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
// Grab our op count map
'ops' GET 'opsCount' STORE
$opsCount $operationId 1 @util/sumField
DROP // Drop the returned map from sumField
%> FOREACH
%> FOREACH
%> FOREACH
%>
<%
DROP 0 STOP
%> IFTE
0 'checkpoints' STORE
// For each of our indexed fields
$results KEYLIST
<%
'field' STORE
$results $field GET 'fieldResults' STORE
// For each unique value seen
$fieldResults KEYLIST
<%
'fieldValue' STORE
// Encode the results
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
// 'Created checkpoint ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
// And create our GTS
NEWGTS $checkpoint_class RENAME
$endTimestamp NaN NaN NaN $value ADDVALUE
{ 'node' $nodeId $field $fieldValue } RELABEL
$write_token UPDATE
$checkpoints 1 + 'checkpoints' STORE
%> FOREACH
%> FOREACH
NEWGTS $master_checkpoint_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
$checkpoints // Leave the number of created checkpoints on the stack
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
$macro

View File

@ -1,157 +0,0 @@
{
'name' 'utapi/repairRecord'
'desc'
<'
'>
'sig' [ [ [ ] [ ] ] ] // Signature
'params' {
// Signature params description
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
// 'Checking for repairs' LOGMSG
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
$operation_info 'nodeId' GET 'nodeId' STORE
{ 'node' $nodeId } 'filterLabels' STORE
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
// Grab our passed timestamp and store it as the variable `endTimestamp`
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
'utapi.repair.master' 'master_repair_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.event' 'metric_class' STORE
$read_token $master_repair_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master repair
FIRSTTICK
// If we found a repair, increment its timestamp so we start at the tick immediatly after
<% DUP 0 > %>
<% 1 + %> IFT
'startTimestamp' STORE // Grab our starting timestamp from the last repair (0 if no repairs)
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
{} 'results' STORE # Create an empty map for results
$fieldsToIndex
<% // For each field create an empty map in results
'field' STORE
$results {} $field PUT DROP
%> FOREACH
// Fetch all events since the last repair
{
'token' $read_token
'class' $metric_class
'labels' $filterLabels
'start' $startTimestamp
'end' $endTimestamp
} FETCH
// STOP
<% DUP SIZE 0 > %>
<%
0 GET
<%
'event' STORE
$event 4 GET @utapi/decodeEvent 'decoded' STORE
// 'Including event ' $decoded 'id' GET + LOGMSG
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
$decoded 'op' GET 'operationId' STORE
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
<%
'field' STORE // Store the field
$decoded $field GET 'fieldValue' STORE // Store the fields value
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
<%
$fieldResults $fieldValue GET // Grab the existing map for the correction and leave it on the stack
%>
<%
// Push empty correction onto stack
{
'objD' 0
'sizeD' 0
'inB' 0
'outB' 0
'ops' {}
} 'fieldValueResults' STORE
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
$fieldValueResults // Leave it on the stack
%> IFTE
// Consumes a map off the stack summing the specified field and the passed value
// Leaves the modified map on the stack
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
// Grab our op count map
'ops' GET 'opsCount' STORE
$opsCount $operationId 1 @util/sumField
DROP // Drop the returned map from sumField
%> FOREACH
%> FOREACH
%>
<%
DROP 0 STOP
%> IFTE
0 'corrections' STORE
$results KEYLIST
<%
'field' STORE
$results $field GET 'fieldResults' STORE
// For each unique value seen
$fieldResults KEYLIST
<%
'fieldValue' STORE
// Encode the results
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
// 'Created correction ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
// And create our GTS
NEWGTS $correction_class RENAME
$endTimestamp NaN NaN NaN $value ADDVALUE
{ 'node' $nodeId $field $fieldValue } RELABEL
$write_token UPDATE
$corrections 1 + 'corrections' STORE
%> FOREACH
%> FOREACH
NEWGTS $master_repair_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
$corrections // Leave the number of created corrections on the stack
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
// Unit tests
$macro