Compare commits

..

10 Commits

Author SHA1 Message Date
Taylor McKinnon 071cbd03d0 update Dockerfile for new warpscript paths 2021-09-02 09:45:34 -07:00
Taylor McKinnon a09fce2467 cleanup and reorganize warpscript 2021-09-02 09:45:27 -07:00
Taylor McKinnon d0a9a32cb1 remove events from metric caculation 2021-08-20 12:33:28 -07:00
Taylor McKinnon 5dd3a7f68b fix snapshots 2021-08-20 12:31:13 -07:00
Taylor McKinnon 41935ac187 remove old tasks 2021-08-20 12:30:44 -07:00
Taylor McKinnon 84366f98ad make snapshots include repair checkpoints 2021-08-20 11:45:06 -07:00
Taylor McKinnon 802434f312 remove uneeded repair task 2021-08-20 11:39:39 -07:00
Taylor McKinnon 74ccd4e8ca remove uneeded checkpoint task 2021-08-20 11:39:14 -07:00
Taylor McKinnon dec8f68ae3 implement checkpoint calculation 2021-08-20 11:35:21 -07:00
Taylor McKinnon e13e428dec allow mixed classes and labels in warp10 ingest 2021-08-20 11:34:54 -07:00
29 changed files with 149 additions and 578 deletions

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'CreateCheckpoint',
});
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting checkpoint creation'))
.then(() => task.start())
.then(() => logger.info('Checkpoint creation started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Repair',
});
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting Repair daemon'))
.then(() => task.start())
.then(() => logger.info('Repair started'));

View File

@ -30,7 +30,8 @@ ENV SENSISION_PORT 8082
ENV standalone.host 0.0.0.0
ENV standalone.port 4802
ENV standalone.home /opt/warp10
ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warpscript.repository.directory /usr/local/share/warpscript/scality
ENV runner.root /usr/local/share/warpscript/runners
ENV warp.token.file /static.tokens
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'

View File

@ -1,36 +0,0 @@
const BaseTask = require('./BaseTask');
const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants');
const { LoggerContext } = require('../utils');
const logger = new LoggerContext({
module: 'CreateCheckpoint',
});
class CreateCheckpoint extends BaseTask {
constructor(options) {
super(options);
this._defaultSchedule = config.checkpointSchedule;
this._defaultLag = checkpointLagSecs;
}
async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/createCheckpoint',
};
return warp10.exec(params);
});
if (status.result[0]) {
logger.info(`created ${status.result[0] || 0} checkpoints`);
}
}
}
module.exports = CreateCheckpoint;

View File

@ -1,12 +1,13 @@
/* eslint-disable no-restricted-globals */
/* eslint-disable no-restricted-syntax */
const assert = require('assert');
const async = require('async');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const { UtapiRecord } = require('../models');
const config = require('../config');
const { checkpointLagSecs } = require('../constants');
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
} = require('../utils');
const logger = new LoggerContext({
@ -15,6 +16,43 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
function orZero(value) {
return value || 0;
}
function _updateCheckpoint(checkpoint, metric) {
const ops = checkpoint.operations || {};
return {
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
};
}
function _checkpointFactory(labels) {
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
let oldest = NaN;
let newest = NaN;
return {
update: metric => {
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
labels
.filter(label => !!metric[label])
.forEach(label => {
const value = metric[label];
const checkpoint = checkpoints[label][value] || {};
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
});
},
checkpoints: () => (checkpoints),
oldest: () => (oldest),
newest: () => (newest),
};
}
class IngestShardTask extends BaseTask {
constructor(options) {
super(options);
@ -23,16 +61,6 @@ class IngestShardTask extends BaseTask {
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
_hydrateEvent(event, stripTimestamp = false) {
if (this._stripEventUUID) {
delete event.uuid;
}
if (stripTimestamp) {
delete event.timestamp;
}
return new UtapiMetric(event);
}
async _execute(timestamp) {
const endShard = shardFromTimestamp(timestamp);
logger.debug('ingesting shards', { endShard });
@ -50,46 +78,50 @@ class IngestShardTask extends BaseTask {
await async.eachLimit(toIngest, 10,
async shard => {
if (await this._cache.shardExists(shard)) {
const metrics = [];
const factory = _checkpointFactory(['bucket', 'account']);
for await (const metric of this._cache.getMetricsForShard(shard)) {
metrics.push(metric);
factory.update(metric);
}
if (metrics.length > 0) {
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
const shardAge = now() - shard;
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
if (areSlowEvents) {
logger.info('Detected slow records, ingesting as repair');
}
const shardAge = now() - shard;
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
const metricClass = areSlowEvents ? 'utapi.repair.checkpoint' : 'utapi.checkpoint';
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
if (areSlowEvents) {
logger.info('Detected slow records, ingesting as repair');
}
records.sort((a, b) => a.timestamp - b.timestamp);
const checkpoints = [];
const checkpointTimestamp = areSlowEvents ? now() : shard;
const clock = new InterpolatedClock();
records.forEach(r => {
r.timestamp = clock.getTs(r.timestamp);
});
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(
{
Object.entries(factory.checkpoints())
.forEach(([level, chkpts]) => {
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
const data = new UtapiRecord({
...checkpoint,
timestamp: checkpointTimestamp,
});
checkpoints.push({
className: metricClass,
labels: { origin: config.nodeId },
}, records,
);
valueType: warp10RecordType,
labels: {
origin: config.nodeId,
[eventFieldsToWarp10[level]]: resource,
},
data,
});
});
});
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
} else {
logger.debug('No events found in shard, cleaning up');
}
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(checkpoints);
});
assert.strictEqual(status, checkpoints.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
} else {
logger.warn('shard does not exist', { shard });
}

View File

@ -1,37 +0,0 @@
const BaseTask = require('./BaseTask');
const config = require('../config');
const { LoggerContext } = require('../utils');
const { repairLagSecs, indexedEventFields } = require('../constants');
const logger = new LoggerContext({
module: 'Repair',
});
class RepairTask extends BaseTask {
constructor(options) {
super(options);
this._defaultSchedule = config.repairSchedule;
this._defaultLag = repairLagSecs;
}
async _execute(timestamp) {
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
const status = await this.withWarp10(warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/repairRecords',
};
return warp10.exec(params);
});
if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`);
}
}
}
module.exports = RepairTask;

View File

@ -1,8 +1,6 @@
const BaseTask = require('./BaseTask');
const IngestShard = require('./IngestShard');
const CreateCheckpoint = require('./CreateCheckpoint');
const CreateSnapshot = require('./CreateSnapshot');
const RepairTask = require('./Repair');
const ReindexTask = require('./Reindex');
const MigrateTask = require('./Migrate');
const MonitorDiskUsage = require('./DiskUsage');
@ -11,9 +9,7 @@ const ManualAdjust = require('./ManualAdjust');
module.exports = {
IngestShard,
BaseTask,
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
MigrateTask,
MonitorDiskUsage,

View File

@ -1,7 +1,8 @@
const config = require('../config');
/**
* Returns a unix style timestamp floored to 10 second resolution
* Returns a unix style timestamp converted to a configurable resolution
* Represents a timespan of interval size ending at the returned value.
* @param {Number} timestamp - Unix timestamp with millisecond/microsecond resolution
* @returns {Number} - Unix timestamp representing beginning of shard
*/
@ -10,7 +11,8 @@ function shardFromTimestamp(timestamp) {
if (timestamp > 1000000000000000) { // handle microsecond resolution
interval = config.ingestionShardSize * 1000000;
}
return timestamp - (timestamp % interval);
return Math.ceil(timestamp / interval) * interval;
}
module.exports = {

View File

@ -38,18 +38,33 @@ class Warp10Client {
}
async ingest(metadata, events) {
const { className, valueType, labels } = metadata;
assert.notStrictEqual(className, undefined, 'you must provide a className');
const payload = events.map(
ev => this._buildGTSEntry(
className,
valueType || warp10EventType,
labels || {},
ev,
),
);
let payload;
// If two arguments are provided
if (events !== undefined) {
const { className, valueType, labels } = metadata;
assert.notStrictEqual(className, undefined, 'you must provide a className');
payload = events.map(
ev => this._buildGTSEntry(
className,
valueType || warp10EventType,
labels || {},
ev,
),
);
// If only one is provided
} else {
payload = metadata.map(
ev => this._buildGTSEntry(
ev.className,
ev.valueType || warp10EventType,
ev.labels || {},
ev.data,
),
);
}
const res = await this.update(payload);
return res.count;
}
_buildScriptEntry(params) {

View File

@ -71,9 +71,7 @@
"start": "node server.js",
"test": "mocha --recursive tests/unit",
"start_v2:task:ingest": "ENABLE_UTAPI_V2=1 node bin/ingestShards.js",
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
"start_v2:task:migrate": "ENABLE_UTAPI_V2=1 node bin/migrate.js",
"start_v2:task:disk": "ENABLE_UTAPI_V2=1 node bin/diskUsage.js",

View File

@ -0,0 +1 @@
@utapi/writeCanary

View File

@ -38,7 +38,7 @@
'utapi.snapshot.master' 'master_snapshot_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.checkpoint' 'correction_class' STORE
// Fetch latest master snapshot
$read_token $master_snapshot_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore
@ -54,6 +54,7 @@
".app"
".producer"
".owner"
"origin"
] ->SET 'ignoredLabels' STORE
// Search for available snapshots

View File

@ -87,7 +87,7 @@
$opsResults SWAP $op PUT DROP
%> FOREACH
$results $opsResults 'operations' PUT
$results $opsResults 'operations' PUT DROP
$results ->JSON
// DUP LOGMSG

View File

@ -35,11 +35,9 @@
$operation_info 'node' GET 'nodeID' STORE
$operation_info 'no_reindex' GET true == 'no_reindex' STORE
'utapi.event' 'event_class' STORE
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.checkpoint' 'correction_class' STORE
'utapi.repair.reindex' 'reindex_class' STORE
{} 'snapshots' STORE
@ -128,8 +126,6 @@
'end' $endTimestamp
'start' $startTimestamp 1 +
} FETCH
%> FOREACH
]
@ -153,93 +149,6 @@
] REDUCE
DROP
// 'results: ' $results ->JSON + LOGMSG
// 'loading master checkpoints' LOGMSG
// Load the most recent master checkpoint before our target timestamp from each node
{} 'masterCheckpoints' STORE
{
'token' $read_token
'class' $master_checkpoint_class
'labels' {}
'end' $endTimestamp
'count' 1
}
FETCH
<%
$masterCheckpoints SWAP
DUP LASTTICK SWAP
LABELS 'node' GET
PUT DROP
%> FOREACH
// 'loaded master checkpoints: '
// $masterCheckpoints ->JSON + LOGMSG
// $labels ->JSON LOGMSG
// Find all nodes containing events
'load_events' SECTION
[
$read_token
$event_class
{}
] FINDSETS
DROP SWAP DROP
'node' GET
<% DUP ISNULL %>
<% DROP [] %> IFT
// Load events from each node
<%
'key' STORE
// Get the timestamp of the master checkpoint for the node if it exists
// If no master checkpoint exists for a node then we can infer that no
// snapshots exists for that node as well, and we must start at 0
$masterCheckpoints $key GET
<% DUP TYPEOF 'LONG' != %>
<% DROP -1 %> IFT
'startTimestamp' STORE
{
'token' $read_token
'class' $event_class
'labels' { 'node' $key }
'end' $endTimestamp
'start' $startTimestamp 1 +
} FETCH
<% // Handle multiple GTS
VALUES
<%
@utapi/decodeEvent 'event' STORE
true 'passed' STORE
$labels KEYLIST
<%
'labelKey' STORE
<% $labels $labelKey GET $event $labelKey GET != %>
<%
false 'passed' STORE
BREAK
%> IFT
%> FOREACH
<% $passed %>
<%
$results
'objD' $event 'objD' 0 @util/getDefault @util/sumField
'sizeD' $event 'sizeD' 0 @util/getDefault @util/sumField
'inB' $event 'inB' 0 @util/getDefault @util/sumField
'outB' $event 'outB' 0 @util/getDefault @util/sumField
'ops' GET 'resultOps' STORE
$resultOps $event 'op' GET 1 @util/sumField
DROP
%> IFT
%> FOREACH
%> FOREACH
%> FOREACH
// $results ->JSON LOGMSG
// Find all nodes containing corrections
'load_corrections' SECTION
[

View File

@ -0,0 +1,36 @@
{
'name' 'utapi/writeCanary'
'desc'
<'
Write a canary record containing the current timestamp labeled with the nodeId
'>
'sig' [ [ [ ] [ ] ] ] // Signature
'params' {}
'examples' [
<'
@utapi/writeCanary
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
'nodeId' MACROCONFIG 'nodeId' STORE
NEWGTS
'utapi.canary' RENAME
{ 'nodeId' $nodeId } RELABEL
NOW NaN NaN NaN true ADDVALUE
'writeTokenStatic' UPDATE
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
$macro

View File

@ -1,160 +0,0 @@
{
'name' 'utapi/createCheckPoint'
'desc'
<'
Scans recent events and generates multiple checkpoint GTS based on the value of the provided fields
'>
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
'params' {
// Signature params description
'a' 'Map containing read/write tokens'
'o' 'Map containing operation info'
'c' 'List of created checkpoints'
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
// 'Creating checkpoints' LOGMSG
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
$operation_info 'nodeId' GET 'nodeId' STORE
{ 'node' $nodeId } 'filterLabels' STORE
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
// Grab our passed timestamp and store it as the variable `endTimestamp`
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.event' 'metric_class' STORE
$read_token $master_checkpoint_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master checkpoint
FIRSTTICK
// If we found a checkpoint, increment its timestamp
<% DUP 0 > %>
<% 1 + %> IFT
'startTimestamp' STORE // Grab our starting timestamp from the last checkpoint (0 if no checkpoints)
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
// 'Using ' $endTimestamp TOSTRING + ' as endTimestamp' + LOGMSG
{} 'results' STORE # Create an empty map for results
$fieldsToIndex
<% // For each field create an empty map in results
'field' STORE
$results {} $field PUT DROP
%> FOREACH
// Fetch all events since the last checkpoint
{
'token' $read_token
'class' $metric_class
'labels' $filterLabels
'start' $startTimestamp
'end' $endTimestamp
} FETCH
<% DUP SIZE 0 > %>
<%
<%
<% // Iter over events
'event' STORE // Store the event
$event 4 GET @utapi/decodeEvent 'decoded' STORE
// 'Including event ' $event 0 GET TOSTRING + ' ' + $decoded ->JSON + LOGMSG
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
$decoded 'op' GET 'operationId' STORE
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
<%
'field' STORE // Store the field
$decoded $field GET 'fieldValue' STORE // Store the fields value
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
<%
$fieldResults $fieldValue GET // Grab the existing map for the checkpoint and leave it on the stack
%>
<%
// Push empty checkpoint onto stack
{
'objD' 0
'sizeD' 0
'inB' 0
'outB' 0
'ops' {}
} 'fieldValueResults' STORE
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
$fieldValueResults // Leave it on the stack
%> IFTE
// Consumes a map off the stack summing the specified field and the passed value
// Leaves the modified map on the stack
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
// Grab our op count map
'ops' GET 'opsCount' STORE
$opsCount $operationId 1 @util/sumField
DROP // Drop the returned map from sumField
%> FOREACH
%> FOREACH
%> FOREACH
%>
<%
DROP 0 STOP
%> IFTE
0 'checkpoints' STORE
// For each of our indexed fields
$results KEYLIST
<%
'field' STORE
$results $field GET 'fieldResults' STORE
// For each unique value seen
$fieldResults KEYLIST
<%
'fieldValue' STORE
// Encode the results
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
// 'Created checkpoint ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
// And create our GTS
NEWGTS $checkpoint_class RENAME
$endTimestamp NaN NaN NaN $value ADDVALUE
{ 'node' $nodeId $field $fieldValue } RELABEL
$write_token UPDATE
$checkpoints 1 + 'checkpoints' STORE
%> FOREACH
%> FOREACH
NEWGTS $master_checkpoint_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
$checkpoints // Leave the number of created checkpoints on the stack
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
$macro

View File

@ -1,157 +0,0 @@
{
'name' 'utapi/repairRecord'
'desc'
<'
'>
'sig' [ [ [ ] [ ] ] ] // Signature
'params' {
// Signature params description
}
'examples' [
<'
'>
]
} 'info' STORE
<%
!$info INFO
SAVE 'context' STORE
<%
// 'Checking for repairs' LOGMSG
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
$auth_info 'write' GET 'write_token' STORE
$auth_info 'read' GET 'read_token' STORE
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
$operation_info 'nodeId' GET 'nodeId' STORE
{ 'node' $nodeId } 'filterLabels' STORE
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
// Grab our passed timestamp and store it as the variable `endTimestamp`
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
'utapi.repair.master' 'master_repair_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.event' 'metric_class' STORE
$read_token $master_repair_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master repair
FIRSTTICK
// If we found a repair, increment its timestamp so we start at the tick immediatly after
<% DUP 0 > %>
<% 1 + %> IFT
'startTimestamp' STORE // Grab our starting timestamp from the last repair (0 if no repairs)
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
{} 'results' STORE # Create an empty map for results
$fieldsToIndex
<% // For each field create an empty map in results
'field' STORE
$results {} $field PUT DROP
%> FOREACH
// Fetch all events since the last repair
{
'token' $read_token
'class' $metric_class
'labels' $filterLabels
'start' $startTimestamp
'end' $endTimestamp
} FETCH
// STOP
<% DUP SIZE 0 > %>
<%
0 GET
<%
'event' STORE
$event 4 GET @utapi/decodeEvent 'decoded' STORE
// 'Including event ' $decoded 'id' GET + LOGMSG
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
$decoded 'op' GET 'operationId' STORE
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
<%
'field' STORE // Store the field
$decoded $field GET 'fieldValue' STORE // Store the fields value
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
<%
$fieldResults $fieldValue GET // Grab the existing map for the correction and leave it on the stack
%>
<%
// Push empty correction onto stack
{
'objD' 0
'sizeD' 0
'inB' 0
'outB' 0
'ops' {}
} 'fieldValueResults' STORE
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
$fieldValueResults // Leave it on the stack
%> IFTE
// Consumes a map off the stack summing the specified field and the passed value
// Leaves the modified map on the stack
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
// Grab our op count map
'ops' GET 'opsCount' STORE
$opsCount $operationId 1 @util/sumField
DROP // Drop the returned map from sumField
%> FOREACH
%> FOREACH
%>
<%
DROP 0 STOP
%> IFTE
0 'corrections' STORE
$results KEYLIST
<%
'field' STORE
$results $field GET 'fieldResults' STORE
// For each unique value seen
$fieldResults KEYLIST
<%
'fieldValue' STORE
// Encode the results
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
// 'Created correction ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
// And create our GTS
NEWGTS $correction_class RENAME
$endTimestamp NaN NaN NaN $value ADDVALUE
{ 'node' $nodeId $field $fieldValue } RELABEL
$write_token UPDATE
$corrections 1 + 'corrections' STORE
%> FOREACH
%> FOREACH
NEWGTS $master_repair_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
$corrections // Leave the number of created corrections on the stack
%>
<% // catch any exception
RETHROW
%>
<% // finally, restore the context
$context RESTORE
%> TRY
%>
'macro' STORE
// Unit tests
$macro