Compare commits
15 Commits
developmen
...
improvemen
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 071cbd03d0 | |
Taylor McKinnon | a09fce2467 | |
Taylor McKinnon | d0a9a32cb1 | |
Taylor McKinnon | 5dd3a7f68b | |
Taylor McKinnon | 41935ac187 | |
Taylor McKinnon | 84366f98ad | |
Taylor McKinnon | 802434f312 | |
Taylor McKinnon | 74ccd4e8ca | |
Taylor McKinnon | dec8f68ae3 | |
Taylor McKinnon | e13e428dec | |
Taylor McKinnon | 034baca47a | |
Taylor McKinnon | 3da9b8642e | |
Taylor McKinnon | 26b7dc5284 | |
Taylor McKinnon | 973e99f1d5 | |
Taylor McKinnon | b4741d7382 |
|
@ -1,15 +0,0 @@
|
||||||
const { tasks } = require('..');
|
|
||||||
const { LoggerContext } = require('../libV2/utils');
|
|
||||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
|
||||||
task: 'CreateCheckpoint',
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
|
|
||||||
|
|
||||||
task.setup()
|
|
||||||
.then(() => logger.info('Starting checkpoint creation'))
|
|
||||||
.then(() => task.start())
|
|
||||||
.then(() => logger.info('Checkpoint creation started'));
|
|
|
@ -1,15 +0,0 @@
|
||||||
const { tasks } = require('..');
|
|
||||||
const { LoggerContext } = require('../libV2/utils');
|
|
||||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
|
||||||
task: 'Repair',
|
|
||||||
});
|
|
||||||
|
|
||||||
|
|
||||||
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
|
|
||||||
|
|
||||||
task.setup()
|
|
||||||
.then(() => logger.info('Starting Repair daemon'))
|
|
||||||
.then(() => task.start())
|
|
||||||
.then(() => logger.info('Repair started'));
|
|
|
@ -30,7 +30,8 @@ ENV SENSISION_PORT 8082
|
||||||
ENV standalone.host 0.0.0.0
|
ENV standalone.host 0.0.0.0
|
||||||
ENV standalone.port 4802
|
ENV standalone.port 4802
|
||||||
ENV standalone.home /opt/warp10
|
ENV standalone.home /opt/warp10
|
||||||
ENV warpscript.repository.directory /usr/local/share/warpscript
|
ENV warpscript.repository.directory /usr/local/share/warpscript/scality
|
||||||
|
ENV runner.root /usr/local/share/warpscript/runners
|
||||||
ENV warp.token.file /static.tokens
|
ENV warp.token.file /static.tokens
|
||||||
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
||||||
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
/* eslint-disable no-restricted-syntax */
|
||||||
const schema = require('../schema');
|
const schema = require('../schema');
|
||||||
const constants = require('../../constants');
|
const constants = require('../../constants');
|
||||||
|
|
||||||
|
@ -48,31 +49,27 @@ class MemoryCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
async addToShard(shard, event) {
|
async addToShard(shard, event) {
|
||||||
const metricKey = schema.getUtapiMetricKey(this._prefix, event);
|
|
||||||
this._data[metricKey] = event;
|
|
||||||
if (this._shards[shard]) {
|
if (this._shards[shard]) {
|
||||||
this._shards[shard].push(metricKey);
|
this._shards[shard][event.uuid] = event;
|
||||||
} else {
|
} else {
|
||||||
this._shards[shard] = [metricKey];
|
this._shards[shard] = { [event.uuid]: event };
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async getKeysInShard(shard) {
|
async getKeysInShard(shard) {
|
||||||
return this._shards[shard] || [];
|
return this._shards[shard] || Object.keys({});
|
||||||
}
|
}
|
||||||
|
|
||||||
async fetchShard(shard) {
|
async* fetchShard(shard) {
|
||||||
if (this._shards[shard]) {
|
const _shard = this._shards[shard] || {};
|
||||||
return this._shards[shard].map(key => this._data[key]);
|
for (const [key, value] of Object.entries(_shard)) {
|
||||||
|
yield key;
|
||||||
|
yield value;
|
||||||
}
|
}
|
||||||
return [];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async deleteShardAndKeys(shard) {
|
async deleteShardAndKeys(shard) {
|
||||||
(this._shards[shard] || []).forEach(key => {
|
|
||||||
delete this._data[key];
|
|
||||||
});
|
|
||||||
delete this._shards[shard];
|
delete this._shards[shard];
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -82,7 +79,7 @@ class MemoryCache {
|
||||||
}
|
}
|
||||||
|
|
||||||
async shardExists(shard) {
|
async shardExists(shard) {
|
||||||
return this._shards[shard.toString()] !== undefined;
|
return this._shards[shard] !== undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateCounters(metric) {
|
async updateCounters(metric) {
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
|
/* eslint-disable no-restricted-syntax */
|
||||||
const RedisClient = require('../../redis');
|
const RedisClient = require('../../redis');
|
||||||
const schema = require('../schema');
|
const schema = require('../schema');
|
||||||
|
|
||||||
const { LoggerContext } = require('../../utils');
|
const { LoggerContext, streamToAsyncIter } = require('../../utils');
|
||||||
const constants = require('../../constants');
|
const constants = require('../../constants');
|
||||||
|
|
||||||
const moduleLogger = new LoggerContext({
|
const moduleLogger = new LoggerContext({
|
||||||
|
@ -60,39 +61,34 @@ class RedisCache {
|
||||||
const logger = moduleLogger.with({ method: 'addToShard' });
|
const logger = moduleLogger.with({ method: 'addToShard' });
|
||||||
return logger
|
return logger
|
||||||
.logAsyncError(async () => {
|
.logAsyncError(async () => {
|
||||||
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
|
|
||||||
const shardKey = schema.getShardKey(this._prefix, shard);
|
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||||
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
||||||
logger.debug('adding metric to shard', { metricKey, shardKey });
|
logger.debug('adding metric to shard', { uuid: metric.uuid, shardKey });
|
||||||
|
|
||||||
const [setResults, saddResults] = await this._redis
|
const [setResults, saddResults] = await this._redis
|
||||||
.call(redis => redis
|
.call(redis => redis
|
||||||
.multi([
|
.multi([
|
||||||
['set', metricKey, JSON.stringify(metric.getValue())],
|
['hset', shardKey, metric.uuid, JSON.stringify(metric.getValue())],
|
||||||
['sadd', shardKey, metricKey],
|
|
||||||
['sadd', shardMasterKey, shardKey],
|
['sadd', shardMasterKey, shardKey],
|
||||||
])
|
])
|
||||||
.exec());
|
.exec());
|
||||||
|
|
||||||
let success = true;
|
if (setResults[0] !== 1) {
|
||||||
if (setResults[1] !== 'OK') {
|
|
||||||
moduleLogger.error('failed to set metric key', {
|
moduleLogger.error('failed to set metric key', {
|
||||||
metricKey,
|
uuid: metric.uuid,
|
||||||
shardKey,
|
shardKey,
|
||||||
res: setResults[1],
|
res: setResults[0],
|
||||||
});
|
});
|
||||||
success = false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (saddResults[1] !== 1) {
|
if (saddResults[1] !== 1) {
|
||||||
moduleLogger.error('metric key already present in shard', {
|
moduleLogger.trace('shard key already present in master', {
|
||||||
metricKey,
|
|
||||||
shardKey,
|
shardKey,
|
||||||
res: saddResults[1],
|
res: saddResults[1],
|
||||||
});
|
});
|
||||||
success = false;
|
|
||||||
}
|
}
|
||||||
return success;
|
return true;
|
||||||
}, 'error during redis command');
|
}, 'error during redis command');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,16 +101,22 @@ class RedisCache {
|
||||||
}, 'error while fetching shard keys', { shard });
|
}, 'error while fetching shard keys', { shard });
|
||||||
}
|
}
|
||||||
|
|
||||||
async fetchShard(shard) {
|
async* fetchShard(shard) {
|
||||||
return moduleLogger
|
moduleLogger
|
||||||
.with({ method: 'fetchShard' })
|
.with({ method: 'fetchShard' })
|
||||||
.logAsyncError(async () => {
|
.debug('fetching metrics from shard', { shard });
|
||||||
const keys = await this.getKeysInShard(shard);
|
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||||
if (!keys.length) {
|
const redis = this._redis._redis;
|
||||||
return [];
|
|
||||||
}
|
// ioredis returns the key and value as separate items
|
||||||
return this._redis.call(redis => redis.mget(...keys));
|
// so we need to filter and only yield the values
|
||||||
}, 'error while fetching shard data', { shard });
|
let isValue = false;
|
||||||
|
for await (const metric of streamToAsyncIter(redis.hscanStream(shardKey, { count: 1000 }))) {
|
||||||
|
if (isValue) {
|
||||||
|
yield JSON.parse(metric);
|
||||||
|
}
|
||||||
|
isValue = !isValue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async deleteShardAndKeys(shard) {
|
async deleteShardAndKeys(shard) {
|
||||||
|
@ -123,10 +125,9 @@ class RedisCache {
|
||||||
.logAsyncError(async () => {
|
.logAsyncError(async () => {
|
||||||
const shardKey = schema.getShardKey(this._prefix, shard);
|
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||||
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
||||||
const keys = await this.getKeysInShard(shard);
|
|
||||||
return this._redis.call(
|
return this._redis.call(
|
||||||
redis => redis.multi([
|
redis => redis.multi([
|
||||||
['del', shardKey, ...keys],
|
['del', shardKey],
|
||||||
['srem', shardMasterKey, shardKey],
|
['srem', shardMasterKey, shardKey],
|
||||||
]).exec(),
|
]).exec(),
|
||||||
);
|
);
|
||||||
|
|
|
@ -30,7 +30,10 @@ class CacheClient {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
async getMetricsForShard(shard) {
|
// backend returns an async iterator
|
||||||
|
// no `async` keyword so it doesn't wrap it in a promise
|
||||||
|
// eslint-disable-next-line require-yield
|
||||||
|
getMetricsForShard(shard) {
|
||||||
return this._cacheBackend.fetchShard(shard);
|
return this._cacheBackend.fetchShard(shard);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
const BaseTask = require('./BaseTask');
|
|
||||||
const config = require('../config');
|
|
||||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
|
||||||
const { LoggerContext } = require('../utils');
|
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
|
||||||
module: 'CreateCheckpoint',
|
|
||||||
});
|
|
||||||
|
|
||||||
class CreateCheckpoint extends BaseTask {
|
|
||||||
constructor(options) {
|
|
||||||
super(options);
|
|
||||||
this._defaultSchedule = config.checkpointSchedule;
|
|
||||||
this._defaultLag = checkpointLagSecs;
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
|
||||||
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
|
||||||
const status = await this.withWarp10(async warp10 => {
|
|
||||||
const params = {
|
|
||||||
params: {
|
|
||||||
nodeId: warp10.nodeId,
|
|
||||||
end: timestamp.toString(),
|
|
||||||
fields: indexedEventFields,
|
|
||||||
},
|
|
||||||
macro: 'utapi/createCheckpoint',
|
|
||||||
};
|
|
||||||
return warp10.exec(params);
|
|
||||||
});
|
|
||||||
if (status.result[0]) {
|
|
||||||
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = CreateCheckpoint;
|
|
|
@ -1,11 +1,13 @@
|
||||||
|
/* eslint-disable no-restricted-globals */
|
||||||
|
/* eslint-disable no-restricted-syntax */
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const { UtapiMetric } = require('../models');
|
const { UtapiRecord } = require('../models');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { checkpointLagSecs } = require('../constants');
|
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
|
||||||
const {
|
const {
|
||||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
|
||||||
} = require('../utils');
|
} = require('../utils');
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
const logger = new LoggerContext({
|
||||||
|
@ -14,6 +16,43 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||||
|
|
||||||
|
function orZero(value) {
|
||||||
|
return value || 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
function _updateCheckpoint(checkpoint, metric) {
|
||||||
|
const ops = checkpoint.operations || {};
|
||||||
|
return {
|
||||||
|
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
|
||||||
|
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
|
||||||
|
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
|
||||||
|
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
|
||||||
|
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function _checkpointFactory(labels) {
|
||||||
|
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
|
||||||
|
let oldest = NaN;
|
||||||
|
let newest = NaN;
|
||||||
|
return {
|
||||||
|
update: metric => {
|
||||||
|
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
|
||||||
|
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
|
||||||
|
labels
|
||||||
|
.filter(label => !!metric[label])
|
||||||
|
.forEach(label => {
|
||||||
|
const value = metric[label];
|
||||||
|
const checkpoint = checkpoints[label][value] || {};
|
||||||
|
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
checkpoints: () => (checkpoints),
|
||||||
|
oldest: () => (oldest),
|
||||||
|
newest: () => (newest),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
class IngestShardTask extends BaseTask {
|
class IngestShardTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super(options);
|
||||||
|
@ -22,17 +61,6 @@ class IngestShardTask extends BaseTask {
|
||||||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
_hydrateEvent(data, stripTimestamp = false) {
|
|
||||||
const event = JSON.parse(data);
|
|
||||||
if (this._stripEventUUID) {
|
|
||||||
delete event.uuid;
|
|
||||||
}
|
|
||||||
if (stripTimestamp) {
|
|
||||||
delete event.timestamp;
|
|
||||||
}
|
|
||||||
return new UtapiMetric(event);
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
const endShard = shardFromTimestamp(timestamp);
|
const endShard = shardFromTimestamp(timestamp);
|
||||||
logger.debug('ingesting shards', { endShard });
|
logger.debug('ingesting shards', { endShard });
|
||||||
|
@ -50,43 +78,50 @@ class IngestShardTask extends BaseTask {
|
||||||
await async.eachLimit(toIngest, 10,
|
await async.eachLimit(toIngest, 10,
|
||||||
async shard => {
|
async shard => {
|
||||||
if (await this._cache.shardExists(shard)) {
|
if (await this._cache.shardExists(shard)) {
|
||||||
const metrics = await this._cache.getMetricsForShard(shard);
|
const factory = _checkpointFactory(['bucket', 'account']);
|
||||||
if (metrics.length > 0) {
|
for await (const metric of this._cache.getMetricsForShard(shard)) {
|
||||||
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
|
factory.update(metric);
|
||||||
const shardAge = now() - shard;
|
|
||||||
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
|
|
||||||
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
|
|
||||||
|
|
||||||
if (areSlowEvents) {
|
|
||||||
logger.info('Detected slow records, ingesting as repair');
|
|
||||||
}
|
|
||||||
|
|
||||||
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
|
|
||||||
|
|
||||||
records.sort((a, b) => a.timestamp - b.timestamp);
|
|
||||||
|
|
||||||
const clock = new InterpolatedClock();
|
|
||||||
records.forEach(r => {
|
|
||||||
r.timestamp = clock.getTs(r.timestamp);
|
|
||||||
});
|
|
||||||
|
|
||||||
let ingestedIntoNodeId;
|
|
||||||
const status = await this.withWarp10(async warp10 => {
|
|
||||||
// eslint-disable-next-line prefer-destructuring
|
|
||||||
ingestedIntoNodeId = warp10.nodeId;
|
|
||||||
return warp10.ingest(
|
|
||||||
{
|
|
||||||
className: metricClass,
|
|
||||||
labels: { origin: config.nodeId },
|
|
||||||
}, records,
|
|
||||||
);
|
|
||||||
});
|
|
||||||
assert.strictEqual(status, records.length);
|
|
||||||
await this._cache.deleteShard(shard);
|
|
||||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
|
||||||
} else {
|
|
||||||
logger.debug('No events found in shard, cleaning up');
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const shardAge = now() - shard;
|
||||||
|
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
|
||||||
|
const metricClass = areSlowEvents ? 'utapi.repair.checkpoint' : 'utapi.checkpoint';
|
||||||
|
|
||||||
|
if (areSlowEvents) {
|
||||||
|
logger.info('Detected slow records, ingesting as repair');
|
||||||
|
}
|
||||||
|
|
||||||
|
const checkpoints = [];
|
||||||
|
const checkpointTimestamp = areSlowEvents ? now() : shard;
|
||||||
|
|
||||||
|
Object.entries(factory.checkpoints())
|
||||||
|
.forEach(([level, chkpts]) => {
|
||||||
|
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
|
||||||
|
const data = new UtapiRecord({
|
||||||
|
...checkpoint,
|
||||||
|
timestamp: checkpointTimestamp,
|
||||||
|
});
|
||||||
|
checkpoints.push({
|
||||||
|
className: metricClass,
|
||||||
|
valueType: warp10RecordType,
|
||||||
|
labels: {
|
||||||
|
origin: config.nodeId,
|
||||||
|
[eventFieldsToWarp10[level]]: resource,
|
||||||
|
},
|
||||||
|
data,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
let ingestedIntoNodeId;
|
||||||
|
const status = await this.withWarp10(async warp10 => {
|
||||||
|
// eslint-disable-next-line prefer-destructuring
|
||||||
|
ingestedIntoNodeId = warp10.nodeId;
|
||||||
|
return warp10.ingest(checkpoints);
|
||||||
|
});
|
||||||
|
assert.strictEqual(status, checkpoints.length);
|
||||||
|
await this._cache.deleteShard(shard);
|
||||||
|
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||||
} else {
|
} else {
|
||||||
logger.warn('shard does not exist', { shard });
|
logger.warn('shard does not exist', { shard });
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ const {
|
||||||
now,
|
now,
|
||||||
convertTimestamp,
|
convertTimestamp,
|
||||||
comprehend,
|
comprehend,
|
||||||
|
streamToAsyncIter,
|
||||||
} = require('../utils');
|
} = require('../utils');
|
||||||
|
|
||||||
const REDIS_CHUNKSIZE = 50;
|
const REDIS_CHUNKSIZE = 50;
|
||||||
|
@ -54,34 +55,9 @@ class MigrateTask extends BaseTask {
|
||||||
return parseInt(value, 10);
|
return parseInt(value, 10);
|
||||||
}
|
}
|
||||||
|
|
||||||
static async* _iterStream(stream) {
|
|
||||||
let finished = false;
|
|
||||||
let data;
|
|
||||||
|
|
||||||
stream.on('end', () => { finished = true; });
|
|
||||||
stream.pause();
|
|
||||||
while (!finished) {
|
|
||||||
data = await new Promise(resolve => {
|
|
||||||
const _resolve = jsutil.once(resolve);
|
|
||||||
const end = () => _resolve([]);
|
|
||||||
stream.once('end', end);
|
|
||||||
stream.once('data', _data => {
|
|
||||||
stream.pause();
|
|
||||||
stream.off('end', end);
|
|
||||||
_resolve(_data);
|
|
||||||
});
|
|
||||||
stream.resume();
|
|
||||||
});
|
|
||||||
|
|
||||||
for (const item of data) {
|
|
||||||
yield item;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async* _iterResources(level) {
|
async* _iterResources(level) {
|
||||||
const redis = this._redis._redis;
|
const redis = this._redis._redis;
|
||||||
const keys = MigrateTask._iterStream(redis.scanStream({
|
const keys = streamToAsyncIter(redis.scanStream({
|
||||||
count: 100,
|
count: 100,
|
||||||
match: `s3:${level}:*:storageUtilized`,
|
match: `s3:${level}:*:storageUtilized`,
|
||||||
}));
|
}));
|
||||||
|
|
|
@ -1,37 +0,0 @@
|
||||||
const BaseTask = require('./BaseTask');
|
|
||||||
const config = require('../config');
|
|
||||||
const { LoggerContext } = require('../utils');
|
|
||||||
const { repairLagSecs, indexedEventFields } = require('../constants');
|
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
|
||||||
module: 'Repair',
|
|
||||||
});
|
|
||||||
|
|
||||||
class RepairTask extends BaseTask {
|
|
||||||
constructor(options) {
|
|
||||||
super(options);
|
|
||||||
this._defaultSchedule = config.repairSchedule;
|
|
||||||
this._defaultLag = repairLagSecs;
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
|
||||||
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
|
||||||
|
|
||||||
const status = await this.withWarp10(warp10 => {
|
|
||||||
const params = {
|
|
||||||
params: {
|
|
||||||
nodeId: warp10.nodeId,
|
|
||||||
end: timestamp.toString(),
|
|
||||||
fields: indexedEventFields,
|
|
||||||
},
|
|
||||||
macro: 'utapi/repairRecords',
|
|
||||||
};
|
|
||||||
return warp10.exec(params);
|
|
||||||
});
|
|
||||||
if (status.result[0]) {
|
|
||||||
logger.info(`created ${status.result[0]} corrections`);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = RepairTask;
|
|
|
@ -1,8 +1,6 @@
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const IngestShard = require('./IngestShard');
|
const IngestShard = require('./IngestShard');
|
||||||
const CreateCheckpoint = require('./CreateCheckpoint');
|
|
||||||
const CreateSnapshot = require('./CreateSnapshot');
|
const CreateSnapshot = require('./CreateSnapshot');
|
||||||
const RepairTask = require('./Repair');
|
|
||||||
const ReindexTask = require('./Reindex');
|
const ReindexTask = require('./Reindex');
|
||||||
const MigrateTask = require('./Migrate');
|
const MigrateTask = require('./Migrate');
|
||||||
const MonitorDiskUsage = require('./DiskUsage');
|
const MonitorDiskUsage = require('./DiskUsage');
|
||||||
|
@ -11,9 +9,7 @@ const ManualAdjust = require('./ManualAdjust');
|
||||||
module.exports = {
|
module.exports = {
|
||||||
IngestShard,
|
IngestShard,
|
||||||
BaseTask,
|
BaseTask,
|
||||||
CreateCheckpoint,
|
|
||||||
CreateSnapshot,
|
CreateSnapshot,
|
||||||
RepairTask,
|
|
||||||
ReindexTask,
|
ReindexTask,
|
||||||
MigrateTask,
|
MigrateTask,
|
||||||
MonitorDiskUsage,
|
MonitorDiskUsage,
|
||||||
|
|
|
@ -3,6 +3,7 @@ const shard = require('./shard');
|
||||||
const timestamp = require('./timestamp');
|
const timestamp = require('./timestamp');
|
||||||
const func = require('./func');
|
const func = require('./func');
|
||||||
const disk = require('./disk');
|
const disk = require('./disk');
|
||||||
|
const stream = require('./stream');
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
...log,
|
...log,
|
||||||
|
@ -10,4 +11,5 @@ module.exports = {
|
||||||
...timestamp,
|
...timestamp,
|
||||||
...func,
|
...func,
|
||||||
...disk,
|
...disk,
|
||||||
|
...stream,
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a unix style timestamp floored to 10 second resolution
|
* Returns a unix style timestamp converted to a configurable resolution
|
||||||
|
* Represents a timespan of interval size ending at the returned value.
|
||||||
* @param {Number} timestamp - Unix timestamp with millisecond/microsecond resolution
|
* @param {Number} timestamp - Unix timestamp with millisecond/microsecond resolution
|
||||||
* @returns {Number} - Unix timestamp representing beginning of shard
|
* @returns {Number} - Unix timestamp representing beginning of shard
|
||||||
*/
|
*/
|
||||||
|
@ -10,7 +11,8 @@ function shardFromTimestamp(timestamp) {
|
||||||
if (timestamp > 1000000000000000) { // handle microsecond resolution
|
if (timestamp > 1000000000000000) { // handle microsecond resolution
|
||||||
interval = config.ingestionShardSize * 1000000;
|
interval = config.ingestionShardSize * 1000000;
|
||||||
}
|
}
|
||||||
return timestamp - (timestamp % interval);
|
|
||||||
|
return Math.ceil(timestamp / interval) * interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
/* eslint-disable no-restricted-syntax */
|
||||||
|
/* eslint-disable no-loop-func */
|
||||||
|
/* eslint-disable no-await-in-loop */
|
||||||
|
|
||||||
|
const { jsutil } = require('arsenal');
|
||||||
|
|
||||||
|
async function* streamToAsyncIter(stream) {
|
||||||
|
let finished = false;
|
||||||
|
let data;
|
||||||
|
|
||||||
|
stream.on('end', () => { finished = true; });
|
||||||
|
stream.pause();
|
||||||
|
while (!finished) {
|
||||||
|
data = await new Promise((resolve, reject) => {
|
||||||
|
const _resolve = jsutil.once(resolve);
|
||||||
|
const _reject = jsutil.once(reject);
|
||||||
|
const end = () => _resolve([]);
|
||||||
|
stream.once('end', end);
|
||||||
|
stream.once('error', _reject);
|
||||||
|
stream.once('data', _data => {
|
||||||
|
stream.pause();
|
||||||
|
stream.off('end', end);
|
||||||
|
stream.off('error', _reject);
|
||||||
|
_resolve(_data);
|
||||||
|
});
|
||||||
|
stream.resume();
|
||||||
|
});
|
||||||
|
|
||||||
|
for (const item of data) {
|
||||||
|
yield item;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
streamToAsyncIter,
|
||||||
|
};
|
|
@ -38,18 +38,33 @@ class Warp10Client {
|
||||||
}
|
}
|
||||||
|
|
||||||
async ingest(metadata, events) {
|
async ingest(metadata, events) {
|
||||||
const { className, valueType, labels } = metadata;
|
let payload;
|
||||||
assert.notStrictEqual(className, undefined, 'you must provide a className');
|
// If two arguments are provided
|
||||||
const payload = events.map(
|
if (events !== undefined) {
|
||||||
ev => this._buildGTSEntry(
|
const { className, valueType, labels } = metadata;
|
||||||
className,
|
assert.notStrictEqual(className, undefined, 'you must provide a className');
|
||||||
valueType || warp10EventType,
|
payload = events.map(
|
||||||
labels || {},
|
ev => this._buildGTSEntry(
|
||||||
ev,
|
className,
|
||||||
),
|
valueType || warp10EventType,
|
||||||
);
|
labels || {},
|
||||||
|
ev,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
// If only one is provided
|
||||||
|
} else {
|
||||||
|
payload = metadata.map(
|
||||||
|
ev => this._buildGTSEntry(
|
||||||
|
ev.className,
|
||||||
|
ev.valueType || warp10EventType,
|
||||||
|
ev.labels || {},
|
||||||
|
ev.data,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
const res = await this.update(payload);
|
const res = await this.update(payload);
|
||||||
return res.count;
|
return res.count;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_buildScriptEntry(params) {
|
_buildScriptEntry(params) {
|
||||||
|
|
|
@ -71,9 +71,7 @@
|
||||||
"start": "node server.js",
|
"start": "node server.js",
|
||||||
"test": "mocha --recursive tests/unit",
|
"test": "mocha --recursive tests/unit",
|
||||||
"start_v2:task:ingest": "ENABLE_UTAPI_V2=1 node bin/ingestShards.js",
|
"start_v2:task:ingest": "ENABLE_UTAPI_V2=1 node bin/ingestShards.js",
|
||||||
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
|
|
||||||
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
|
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
|
||||||
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
|
|
||||||
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
|
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
|
||||||
"start_v2:task:migrate": "ENABLE_UTAPI_V2=1 node bin/migrate.js",
|
"start_v2:task:migrate": "ENABLE_UTAPI_V2=1 node bin/migrate.js",
|
||||||
"start_v2:task:disk": "ENABLE_UTAPI_V2=1 node bin/diskUsage.js",
|
"start_v2:task:disk": "ENABLE_UTAPI_V2=1 node bin/diskUsage.js",
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
@utapi/writeCanary
|
|
@ -38,7 +38,7 @@
|
||||||
'utapi.snapshot.master' 'master_snapshot_class' STORE
|
'utapi.snapshot.master' 'master_snapshot_class' STORE
|
||||||
'utapi.snapshot' 'snapshot_class' STORE
|
'utapi.snapshot' 'snapshot_class' STORE
|
||||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||||
'utapi.repair.correction' 'correction_class' STORE
|
'utapi.repair.checkpoint' 'correction_class' STORE
|
||||||
|
|
||||||
// Fetch latest master snapshot
|
// Fetch latest master snapshot
|
||||||
$read_token $master_snapshot_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore
|
$read_token $master_snapshot_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore
|
||||||
|
@ -54,6 +54,7 @@
|
||||||
".app"
|
".app"
|
||||||
".producer"
|
".producer"
|
||||||
".owner"
|
".owner"
|
||||||
|
"origin"
|
||||||
] ->SET 'ignoredLabels' STORE
|
] ->SET 'ignoredLabels' STORE
|
||||||
|
|
||||||
// Search for available snapshots
|
// Search for available snapshots
|
|
@ -87,7 +87,7 @@
|
||||||
$opsResults SWAP $op PUT DROP
|
$opsResults SWAP $op PUT DROP
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
|
|
||||||
$results $opsResults 'operations' PUT
|
$results $opsResults 'operations' PUT DROP
|
||||||
$results ->JSON
|
$results ->JSON
|
||||||
// DUP LOGMSG
|
// DUP LOGMSG
|
||||||
|
|
|
@ -35,11 +35,9 @@
|
||||||
$operation_info 'node' GET 'nodeID' STORE
|
$operation_info 'node' GET 'nodeID' STORE
|
||||||
$operation_info 'no_reindex' GET true == 'no_reindex' STORE
|
$operation_info 'no_reindex' GET true == 'no_reindex' STORE
|
||||||
|
|
||||||
'utapi.event' 'event_class' STORE
|
|
||||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||||
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
|
|
||||||
'utapi.snapshot' 'snapshot_class' STORE
|
'utapi.snapshot' 'snapshot_class' STORE
|
||||||
'utapi.repair.correction' 'correction_class' STORE
|
'utapi.repair.checkpoint' 'correction_class' STORE
|
||||||
'utapi.repair.reindex' 'reindex_class' STORE
|
'utapi.repair.reindex' 'reindex_class' STORE
|
||||||
|
|
||||||
{} 'snapshots' STORE
|
{} 'snapshots' STORE
|
||||||
|
@ -128,8 +126,6 @@
|
||||||
'end' $endTimestamp
|
'end' $endTimestamp
|
||||||
'start' $startTimestamp 1 +
|
'start' $startTimestamp 1 +
|
||||||
} FETCH
|
} FETCH
|
||||||
|
|
||||||
|
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
|
|
||||||
]
|
]
|
||||||
|
@ -153,93 +149,6 @@
|
||||||
] REDUCE
|
] REDUCE
|
||||||
DROP
|
DROP
|
||||||
|
|
||||||
// 'results: ' $results ->JSON + LOGMSG
|
|
||||||
|
|
||||||
// 'loading master checkpoints' LOGMSG
|
|
||||||
|
|
||||||
// Load the most recent master checkpoint before our target timestamp from each node
|
|
||||||
{} 'masterCheckpoints' STORE
|
|
||||||
{
|
|
||||||
'token' $read_token
|
|
||||||
'class' $master_checkpoint_class
|
|
||||||
'labels' {}
|
|
||||||
'end' $endTimestamp
|
|
||||||
'count' 1
|
|
||||||
}
|
|
||||||
FETCH
|
|
||||||
<%
|
|
||||||
$masterCheckpoints SWAP
|
|
||||||
DUP LASTTICK SWAP
|
|
||||||
LABELS 'node' GET
|
|
||||||
PUT DROP
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
// 'loaded master checkpoints: '
|
|
||||||
// $masterCheckpoints ->JSON + LOGMSG
|
|
||||||
// $labels ->JSON LOGMSG
|
|
||||||
|
|
||||||
// Find all nodes containing events
|
|
||||||
'load_events' SECTION
|
|
||||||
[
|
|
||||||
$read_token
|
|
||||||
$event_class
|
|
||||||
{}
|
|
||||||
] FINDSETS
|
|
||||||
DROP SWAP DROP
|
|
||||||
'node' GET
|
|
||||||
<% DUP ISNULL %>
|
|
||||||
<% DROP [] %> IFT
|
|
||||||
// Load events from each node
|
|
||||||
<%
|
|
||||||
'key' STORE
|
|
||||||
|
|
||||||
// Get the timestamp of the master checkpoint for the node if it exists
|
|
||||||
// If no master checkpoint exists for a node then we can infer that no
|
|
||||||
// snapshots exists for that node as well, and we must start at 0
|
|
||||||
$masterCheckpoints $key GET
|
|
||||||
<% DUP TYPEOF 'LONG' != %>
|
|
||||||
<% DROP -1 %> IFT
|
|
||||||
'startTimestamp' STORE
|
|
||||||
{
|
|
||||||
'token' $read_token
|
|
||||||
'class' $event_class
|
|
||||||
'labels' { 'node' $key }
|
|
||||||
'end' $endTimestamp
|
|
||||||
'start' $startTimestamp 1 +
|
|
||||||
} FETCH
|
|
||||||
<% // Handle multiple GTS
|
|
||||||
VALUES
|
|
||||||
<%
|
|
||||||
@utapi/decodeEvent 'event' STORE
|
|
||||||
true 'passed' STORE
|
|
||||||
$labels KEYLIST
|
|
||||||
<%
|
|
||||||
'labelKey' STORE
|
|
||||||
<% $labels $labelKey GET $event $labelKey GET != %>
|
|
||||||
<%
|
|
||||||
false 'passed' STORE
|
|
||||||
BREAK
|
|
||||||
%> IFT
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
<% $passed %>
|
|
||||||
<%
|
|
||||||
$results
|
|
||||||
'objD' $event 'objD' 0 @util/getDefault @util/sumField
|
|
||||||
'sizeD' $event 'sizeD' 0 @util/getDefault @util/sumField
|
|
||||||
'inB' $event 'inB' 0 @util/getDefault @util/sumField
|
|
||||||
'outB' $event 'outB' 0 @util/getDefault @util/sumField
|
|
||||||
|
|
||||||
'ops' GET 'resultOps' STORE
|
|
||||||
$resultOps $event 'op' GET 1 @util/sumField
|
|
||||||
DROP
|
|
||||||
%> IFT
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
// $results ->JSON LOGMSG
|
|
||||||
|
|
||||||
// Find all nodes containing corrections
|
// Find all nodes containing corrections
|
||||||
'load_corrections' SECTION
|
'load_corrections' SECTION
|
||||||
[
|
[
|
|
@ -0,0 +1,36 @@
|
||||||
|
{
|
||||||
|
'name' 'utapi/writeCanary'
|
||||||
|
'desc'
|
||||||
|
<'
|
||||||
|
Write a canary record containing the current timestamp labeled with the nodeId
|
||||||
|
'>
|
||||||
|
'sig' [ [ [ ] [ ] ] ] // Signature
|
||||||
|
'params' {}
|
||||||
|
'examples' [
|
||||||
|
<'
|
||||||
|
@utapi/writeCanary
|
||||||
|
'>
|
||||||
|
]
|
||||||
|
} 'info' STORE
|
||||||
|
|
||||||
|
<%
|
||||||
|
!$info INFO
|
||||||
|
SAVE 'context' STORE
|
||||||
|
<%
|
||||||
|
'nodeId' MACROCONFIG 'nodeId' STORE
|
||||||
|
NEWGTS
|
||||||
|
'utapi.canary' RENAME
|
||||||
|
{ 'nodeId' $nodeId } RELABEL
|
||||||
|
NOW NaN NaN NaN true ADDVALUE
|
||||||
|
'writeTokenStatic' UPDATE
|
||||||
|
%>
|
||||||
|
<% // catch any exception
|
||||||
|
RETHROW
|
||||||
|
%>
|
||||||
|
<% // finally, restore the context
|
||||||
|
$context RESTORE
|
||||||
|
%> TRY
|
||||||
|
%>
|
||||||
|
'macro' STORE
|
||||||
|
|
||||||
|
$macro
|
|
@ -1,160 +0,0 @@
|
||||||
{
|
|
||||||
'name' 'utapi/createCheckPoint'
|
|
||||||
'desc'
|
|
||||||
<'
|
|
||||||
Scans recent events and generates multiple checkpoint GTS based on the value of the provided fields
|
|
||||||
'>
|
|
||||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
|
||||||
'params' {
|
|
||||||
// Signature params description
|
|
||||||
'a' 'Map containing read/write tokens'
|
|
||||||
'o' 'Map containing operation info'
|
|
||||||
'c' 'List of created checkpoints'
|
|
||||||
}
|
|
||||||
'examples' [
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
]
|
|
||||||
} 'info' STORE
|
|
||||||
|
|
||||||
<%
|
|
||||||
!$info INFO
|
|
||||||
SAVE 'context' STORE
|
|
||||||
<%
|
|
||||||
// 'Creating checkpoints' LOGMSG
|
|
||||||
JSON-> 'operation_info' STORE
|
|
||||||
JSON-> 'auth_info' STORE
|
|
||||||
|
|
||||||
$auth_info 'write' GET 'write_token' STORE
|
|
||||||
$auth_info 'read' GET 'read_token' STORE
|
|
||||||
|
|
||||||
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
|
|
||||||
$operation_info 'nodeId' GET 'nodeId' STORE
|
|
||||||
{ 'node' $nodeId } 'filterLabels' STORE
|
|
||||||
|
|
||||||
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
|
|
||||||
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
|
|
||||||
|
|
||||||
// Grab our passed timestamp and store it as the variable `endTimestamp`
|
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
|
||||||
|
|
||||||
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
|
|
||||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
|
||||||
'utapi.event' 'metric_class' STORE
|
|
||||||
|
|
||||||
$read_token $master_checkpoint_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master checkpoint
|
|
||||||
FIRSTTICK
|
|
||||||
// If we found a checkpoint, increment its timestamp
|
|
||||||
<% DUP 0 > %>
|
|
||||||
<% 1 + %> IFT
|
|
||||||
'startTimestamp' STORE // Grab our starting timestamp from the last checkpoint (0 if no checkpoints)
|
|
||||||
|
|
||||||
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
|
||||||
// 'Using ' $endTimestamp TOSTRING + ' as endTimestamp' + LOGMSG
|
|
||||||
{} 'results' STORE # Create an empty map for results
|
|
||||||
|
|
||||||
$fieldsToIndex
|
|
||||||
<% // For each field create an empty map in results
|
|
||||||
'field' STORE
|
|
||||||
$results {} $field PUT DROP
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
// Fetch all events since the last checkpoint
|
|
||||||
{
|
|
||||||
'token' $read_token
|
|
||||||
'class' $metric_class
|
|
||||||
'labels' $filterLabels
|
|
||||||
'start' $startTimestamp
|
|
||||||
'end' $endTimestamp
|
|
||||||
} FETCH
|
|
||||||
<% DUP SIZE 0 > %>
|
|
||||||
<%
|
|
||||||
<%
|
|
||||||
<% // Iter over events
|
|
||||||
'event' STORE // Store the event
|
|
||||||
$event 4 GET @utapi/decodeEvent 'decoded' STORE
|
|
||||||
// 'Including event ' $event 0 GET TOSTRING + ' ' + $decoded ->JSON + LOGMSG
|
|
||||||
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
|
|
||||||
$decoded 'op' GET 'operationId' STORE
|
|
||||||
|
|
||||||
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
|
|
||||||
<%
|
|
||||||
'field' STORE // Store the field
|
|
||||||
$decoded $field GET 'fieldValue' STORE // Store the fields value
|
|
||||||
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
|
|
||||||
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
|
|
||||||
<%
|
|
||||||
$fieldResults $fieldValue GET // Grab the existing map for the checkpoint and leave it on the stack
|
|
||||||
%>
|
|
||||||
<%
|
|
||||||
// Push empty checkpoint onto stack
|
|
||||||
{
|
|
||||||
'objD' 0
|
|
||||||
'sizeD' 0
|
|
||||||
'inB' 0
|
|
||||||
'outB' 0
|
|
||||||
'ops' {}
|
|
||||||
} 'fieldValueResults' STORE
|
|
||||||
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
|
|
||||||
$fieldValueResults // Leave it on the stack
|
|
||||||
%> IFTE
|
|
||||||
// Consumes a map off the stack summing the specified field and the passed value
|
|
||||||
// Leaves the modified map on the stack
|
|
||||||
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
|
|
||||||
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
|
|
||||||
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
|
|
||||||
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
|
|
||||||
|
|
||||||
// Grab our op count map
|
|
||||||
'ops' GET 'opsCount' STORE
|
|
||||||
$opsCount $operationId 1 @util/sumField
|
|
||||||
DROP // Drop the returned map from sumField
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
%>
|
|
||||||
<%
|
|
||||||
DROP 0 STOP
|
|
||||||
%> IFTE
|
|
||||||
|
|
||||||
0 'checkpoints' STORE
|
|
||||||
// For each of our indexed fields
|
|
||||||
$results KEYLIST
|
|
||||||
<%
|
|
||||||
'field' STORE
|
|
||||||
$results $field GET 'fieldResults' STORE
|
|
||||||
// For each unique value seen
|
|
||||||
$fieldResults KEYLIST
|
|
||||||
<%
|
|
||||||
'fieldValue' STORE
|
|
||||||
// Encode the results
|
|
||||||
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
|
|
||||||
// 'Created checkpoint ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
|
|
||||||
// And create our GTS
|
|
||||||
NEWGTS $checkpoint_class RENAME
|
|
||||||
$endTimestamp NaN NaN NaN $value ADDVALUE
|
|
||||||
{ 'node' $nodeId $field $fieldValue } RELABEL
|
|
||||||
$write_token UPDATE
|
|
||||||
$checkpoints 1 + 'checkpoints' STORE
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
NEWGTS $master_checkpoint_class RENAME
|
|
||||||
$endTimestamp NaN NaN NaN 0 ADDVALUE
|
|
||||||
{ 'node' $nodeId } RELABEL
|
|
||||||
$write_token UPDATE
|
|
||||||
|
|
||||||
$checkpoints // Leave the number of created checkpoints on the stack
|
|
||||||
|
|
||||||
%>
|
|
||||||
<% // catch any exception
|
|
||||||
RETHROW
|
|
||||||
%>
|
|
||||||
<% // finally, restore the context
|
|
||||||
$context RESTORE
|
|
||||||
%> TRY
|
|
||||||
%>
|
|
||||||
'macro' STORE
|
|
||||||
|
|
||||||
$macro
|
|
|
@ -1,157 +0,0 @@
|
||||||
{
|
|
||||||
'name' 'utapi/repairRecord'
|
|
||||||
'desc'
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
'sig' [ [ [ ] [ ] ] ] // Signature
|
|
||||||
'params' {
|
|
||||||
// Signature params description
|
|
||||||
}
|
|
||||||
'examples' [
|
|
||||||
<'
|
|
||||||
|
|
||||||
'>
|
|
||||||
]
|
|
||||||
} 'info' STORE
|
|
||||||
|
|
||||||
<%
|
|
||||||
!$info INFO
|
|
||||||
SAVE 'context' STORE
|
|
||||||
<%
|
|
||||||
// 'Checking for repairs' LOGMSG
|
|
||||||
JSON-> 'operation_info' STORE
|
|
||||||
JSON-> 'auth_info' STORE
|
|
||||||
|
|
||||||
$auth_info 'write' GET 'write_token' STORE
|
|
||||||
$auth_info 'read' GET 'read_token' STORE
|
|
||||||
|
|
||||||
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
|
|
||||||
$operation_info 'nodeId' GET 'nodeId' STORE
|
|
||||||
{ 'node' $nodeId } 'filterLabels' STORE
|
|
||||||
|
|
||||||
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
|
|
||||||
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
|
|
||||||
|
|
||||||
// Grab our passed timestamp and store it as the variable `endTimestamp`
|
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
|
||||||
|
|
||||||
'utapi.repair.master' 'master_repair_class' STORE
|
|
||||||
'utapi.repair.correction' 'correction_class' STORE
|
|
||||||
'utapi.repair.event' 'metric_class' STORE
|
|
||||||
|
|
||||||
$read_token $master_repair_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master repair
|
|
||||||
FIRSTTICK
|
|
||||||
// If we found a repair, increment its timestamp so we start at the tick immediatly after
|
|
||||||
<% DUP 0 > %>
|
|
||||||
<% 1 + %> IFT
|
|
||||||
'startTimestamp' STORE // Grab our starting timestamp from the last repair (0 if no repairs)
|
|
||||||
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
|
||||||
|
|
||||||
{} 'results' STORE # Create an empty map for results
|
|
||||||
|
|
||||||
$fieldsToIndex
|
|
||||||
<% // For each field create an empty map in results
|
|
||||||
'field' STORE
|
|
||||||
$results {} $field PUT DROP
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
// Fetch all events since the last repair
|
|
||||||
{
|
|
||||||
'token' $read_token
|
|
||||||
'class' $metric_class
|
|
||||||
'labels' $filterLabels
|
|
||||||
'start' $startTimestamp
|
|
||||||
'end' $endTimestamp
|
|
||||||
} FETCH
|
|
||||||
// STOP
|
|
||||||
<% DUP SIZE 0 > %>
|
|
||||||
<%
|
|
||||||
0 GET
|
|
||||||
<%
|
|
||||||
'event' STORE
|
|
||||||
$event 4 GET @utapi/decodeEvent 'decoded' STORE
|
|
||||||
// 'Including event ' $decoded 'id' GET + LOGMSG
|
|
||||||
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
|
|
||||||
$decoded 'op' GET 'operationId' STORE
|
|
||||||
|
|
||||||
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
|
|
||||||
<%
|
|
||||||
'field' STORE // Store the field
|
|
||||||
$decoded $field GET 'fieldValue' STORE // Store the fields value
|
|
||||||
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
|
|
||||||
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
|
|
||||||
<%
|
|
||||||
$fieldResults $fieldValue GET // Grab the existing map for the correction and leave it on the stack
|
|
||||||
%>
|
|
||||||
<%
|
|
||||||
// Push empty correction onto stack
|
|
||||||
{
|
|
||||||
'objD' 0
|
|
||||||
'sizeD' 0
|
|
||||||
'inB' 0
|
|
||||||
'outB' 0
|
|
||||||
'ops' {}
|
|
||||||
} 'fieldValueResults' STORE
|
|
||||||
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
|
|
||||||
$fieldValueResults // Leave it on the stack
|
|
||||||
%> IFTE
|
|
||||||
// Consumes a map off the stack summing the specified field and the passed value
|
|
||||||
// Leaves the modified map on the stack
|
|
||||||
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
|
|
||||||
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
|
|
||||||
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
|
|
||||||
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
|
|
||||||
|
|
||||||
// Grab our op count map
|
|
||||||
'ops' GET 'opsCount' STORE
|
|
||||||
$opsCount $operationId 1 @util/sumField
|
|
||||||
DROP // Drop the returned map from sumField
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
%>
|
|
||||||
<%
|
|
||||||
DROP 0 STOP
|
|
||||||
%> IFTE
|
|
||||||
|
|
||||||
0 'corrections' STORE
|
|
||||||
$results KEYLIST
|
|
||||||
<%
|
|
||||||
'field' STORE
|
|
||||||
$results $field GET 'fieldResults' STORE
|
|
||||||
// For each unique value seen
|
|
||||||
$fieldResults KEYLIST
|
|
||||||
<%
|
|
||||||
'fieldValue' STORE
|
|
||||||
// Encode the results
|
|
||||||
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
|
|
||||||
// 'Created correction ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
|
|
||||||
// And create our GTS
|
|
||||||
NEWGTS $correction_class RENAME
|
|
||||||
$endTimestamp NaN NaN NaN $value ADDVALUE
|
|
||||||
{ 'node' $nodeId $field $fieldValue } RELABEL
|
|
||||||
$write_token UPDATE
|
|
||||||
$corrections 1 + 'corrections' STORE
|
|
||||||
%> FOREACH
|
|
||||||
%> FOREACH
|
|
||||||
|
|
||||||
NEWGTS $master_repair_class RENAME
|
|
||||||
$endTimestamp NaN NaN NaN 0 ADDVALUE
|
|
||||||
{ 'node' $nodeId } RELABEL
|
|
||||||
$write_token UPDATE
|
|
||||||
|
|
||||||
$corrections // Leave the number of created corrections on the stack
|
|
||||||
|
|
||||||
%>
|
|
||||||
<% // catch any exception
|
|
||||||
RETHROW
|
|
||||||
%>
|
|
||||||
<% // finally, restore the context
|
|
||||||
$context RESTORE
|
|
||||||
%> TRY
|
|
||||||
%>
|
|
||||||
'macro' STORE
|
|
||||||
|
|
||||||
// Unit tests
|
|
||||||
|
|
||||||
$macro
|
|
Loading…
Reference in New Issue