Compare commits
15 Commits
developmen
...
improvemen
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 071cbd03d0 | |
Taylor McKinnon | a09fce2467 | |
Taylor McKinnon | d0a9a32cb1 | |
Taylor McKinnon | 5dd3a7f68b | |
Taylor McKinnon | 41935ac187 | |
Taylor McKinnon | 84366f98ad | |
Taylor McKinnon | 802434f312 | |
Taylor McKinnon | 74ccd4e8ca | |
Taylor McKinnon | dec8f68ae3 | |
Taylor McKinnon | e13e428dec | |
Taylor McKinnon | 034baca47a | |
Taylor McKinnon | 3da9b8642e | |
Taylor McKinnon | 26b7dc5284 | |
Taylor McKinnon | 973e99f1d5 | |
Taylor McKinnon | b4741d7382 |
|
@ -1,15 +0,0 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'CreateCheckpoint',
|
||||
});
|
||||
|
||||
|
||||
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting checkpoint creation'))
|
||||
.then(() => task.start())
|
||||
.then(() => logger.info('Checkpoint creation started'));
|
|
@ -1,15 +0,0 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'Repair',
|
||||
});
|
||||
|
||||
|
||||
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting Repair daemon'))
|
||||
.then(() => task.start())
|
||||
.then(() => logger.info('Repair started'));
|
|
@ -30,7 +30,8 @@ ENV SENSISION_PORT 8082
|
|||
ENV standalone.host 0.0.0.0
|
||||
ENV standalone.port 4802
|
||||
ENV standalone.home /opt/warp10
|
||||
ENV warpscript.repository.directory /usr/local/share/warpscript
|
||||
ENV warpscript.repository.directory /usr/local/share/warpscript/scality
|
||||
ENV runner.root /usr/local/share/warpscript/runners
|
||||
ENV warp.token.file /static.tokens
|
||||
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
|
||||
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
const schema = require('../schema');
|
||||
const constants = require('../../constants');
|
||||
|
||||
|
@ -48,31 +49,27 @@ class MemoryCache {
|
|||
}
|
||||
|
||||
async addToShard(shard, event) {
|
||||
const metricKey = schema.getUtapiMetricKey(this._prefix, event);
|
||||
this._data[metricKey] = event;
|
||||
if (this._shards[shard]) {
|
||||
this._shards[shard].push(metricKey);
|
||||
this._shards[shard][event.uuid] = event;
|
||||
} else {
|
||||
this._shards[shard] = [metricKey];
|
||||
this._shards[shard] = { [event.uuid]: event };
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async getKeysInShard(shard) {
|
||||
return this._shards[shard] || [];
|
||||
return this._shards[shard] || Object.keys({});
|
||||
}
|
||||
|
||||
async fetchShard(shard) {
|
||||
if (this._shards[shard]) {
|
||||
return this._shards[shard].map(key => this._data[key]);
|
||||
async* fetchShard(shard) {
|
||||
const _shard = this._shards[shard] || {};
|
||||
for (const [key, value] of Object.entries(_shard)) {
|
||||
yield key;
|
||||
yield value;
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
async deleteShardAndKeys(shard) {
|
||||
(this._shards[shard] || []).forEach(key => {
|
||||
delete this._data[key];
|
||||
});
|
||||
delete this._shards[shard];
|
||||
return true;
|
||||
}
|
||||
|
@ -82,7 +79,7 @@ class MemoryCache {
|
|||
}
|
||||
|
||||
async shardExists(shard) {
|
||||
return this._shards[shard.toString()] !== undefined;
|
||||
return this._shards[shard] !== undefined;
|
||||
}
|
||||
|
||||
async updateCounters(metric) {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
const RedisClient = require('../../redis');
|
||||
const schema = require('../schema');
|
||||
|
||||
const { LoggerContext } = require('../../utils');
|
||||
const { LoggerContext, streamToAsyncIter } = require('../../utils');
|
||||
const constants = require('../../constants');
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
|
@ -60,39 +61,34 @@ class RedisCache {
|
|||
const logger = moduleLogger.with({ method: 'addToShard' });
|
||||
return logger
|
||||
.logAsyncError(async () => {
|
||||
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
|
||||
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
||||
logger.debug('adding metric to shard', { metricKey, shardKey });
|
||||
logger.debug('adding metric to shard', { uuid: metric.uuid, shardKey });
|
||||
|
||||
const [setResults, saddResults] = await this._redis
|
||||
.call(redis => redis
|
||||
.multi([
|
||||
['set', metricKey, JSON.stringify(metric.getValue())],
|
||||
['sadd', shardKey, metricKey],
|
||||
['hset', shardKey, metric.uuid, JSON.stringify(metric.getValue())],
|
||||
['sadd', shardMasterKey, shardKey],
|
||||
])
|
||||
.exec());
|
||||
|
||||
let success = true;
|
||||
if (setResults[1] !== 'OK') {
|
||||
if (setResults[0] !== 1) {
|
||||
moduleLogger.error('failed to set metric key', {
|
||||
metricKey,
|
||||
uuid: metric.uuid,
|
||||
shardKey,
|
||||
res: setResults[1],
|
||||
res: setResults[0],
|
||||
});
|
||||
success = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (saddResults[1] !== 1) {
|
||||
moduleLogger.error('metric key already present in shard', {
|
||||
metricKey,
|
||||
moduleLogger.trace('shard key already present in master', {
|
||||
shardKey,
|
||||
res: saddResults[1],
|
||||
});
|
||||
success = false;
|
||||
}
|
||||
return success;
|
||||
return true;
|
||||
}, 'error during redis command');
|
||||
}
|
||||
|
||||
|
@ -105,16 +101,22 @@ class RedisCache {
|
|||
}, 'error while fetching shard keys', { shard });
|
||||
}
|
||||
|
||||
async fetchShard(shard) {
|
||||
return moduleLogger
|
||||
async* fetchShard(shard) {
|
||||
moduleLogger
|
||||
.with({ method: 'fetchShard' })
|
||||
.logAsyncError(async () => {
|
||||
const keys = await this.getKeysInShard(shard);
|
||||
if (!keys.length) {
|
||||
return [];
|
||||
.debug('fetching metrics from shard', { shard });
|
||||
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||
const redis = this._redis._redis;
|
||||
|
||||
// ioredis returns the key and value as separate items
|
||||
// so we need to filter and only yield the values
|
||||
let isValue = false;
|
||||
for await (const metric of streamToAsyncIter(redis.hscanStream(shardKey, { count: 1000 }))) {
|
||||
if (isValue) {
|
||||
yield JSON.parse(metric);
|
||||
}
|
||||
isValue = !isValue;
|
||||
}
|
||||
return this._redis.call(redis => redis.mget(...keys));
|
||||
}, 'error while fetching shard data', { shard });
|
||||
}
|
||||
|
||||
async deleteShardAndKeys(shard) {
|
||||
|
@ -123,10 +125,9 @@ class RedisCache {
|
|||
.logAsyncError(async () => {
|
||||
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
||||
const keys = await this.getKeysInShard(shard);
|
||||
return this._redis.call(
|
||||
redis => redis.multi([
|
||||
['del', shardKey, ...keys],
|
||||
['del', shardKey],
|
||||
['srem', shardMasterKey, shardKey],
|
||||
]).exec(),
|
||||
);
|
||||
|
|
|
@ -30,7 +30,10 @@ class CacheClient {
|
|||
return true;
|
||||
}
|
||||
|
||||
async getMetricsForShard(shard) {
|
||||
// backend returns an async iterator
|
||||
// no `async` keyword so it doesn't wrap it in a promise
|
||||
// eslint-disable-next-line require-yield
|
||||
getMetricsForShard(shard) {
|
||||
return this._cacheBackend.fetchShard(shard);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
||||
const { LoggerContext } = require('../utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'CreateCheckpoint',
|
||||
});
|
||||
|
||||
class CreateCheckpoint extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this._defaultSchedule = config.checkpointSchedule;
|
||||
this._defaultLag = checkpointLagSecs;
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
||||
const status = await this.withWarp10(async warp10 => {
|
||||
const params = {
|
||||
params: {
|
||||
nodeId: warp10.nodeId,
|
||||
end: timestamp.toString(),
|
||||
fields: indexedEventFields,
|
||||
},
|
||||
macro: 'utapi/createCheckpoint',
|
||||
};
|
||||
return warp10.exec(params);
|
||||
});
|
||||
if (status.result[0]) {
|
||||
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = CreateCheckpoint;
|
|
@ -1,11 +1,13 @@
|
|||
/* eslint-disable no-restricted-globals */
|
||||
/* eslint-disable no-restricted-syntax */
|
||||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const { UtapiMetric } = require('../models');
|
||||
const { UtapiRecord } = require('../models');
|
||||
const config = require('../config');
|
||||
const { checkpointLagSecs } = require('../constants');
|
||||
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
|
||||
const {
|
||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
|
||||
} = require('../utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
|
@ -14,6 +16,43 @@ const logger = new LoggerContext({
|
|||
|
||||
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||
|
||||
function orZero(value) {
|
||||
return value || 0;
|
||||
}
|
||||
|
||||
function _updateCheckpoint(checkpoint, metric) {
|
||||
const ops = checkpoint.operations || {};
|
||||
return {
|
||||
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
|
||||
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
|
||||
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
|
||||
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
|
||||
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
|
||||
};
|
||||
}
|
||||
|
||||
function _checkpointFactory(labels) {
|
||||
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
|
||||
let oldest = NaN;
|
||||
let newest = NaN;
|
||||
return {
|
||||
update: metric => {
|
||||
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
|
||||
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
|
||||
labels
|
||||
.filter(label => !!metric[label])
|
||||
.forEach(label => {
|
||||
const value = metric[label];
|
||||
const checkpoint = checkpoints[label][value] || {};
|
||||
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
|
||||
});
|
||||
},
|
||||
checkpoints: () => (checkpoints),
|
||||
oldest: () => (oldest),
|
||||
newest: () => (newest),
|
||||
};
|
||||
}
|
||||
|
||||
class IngestShardTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
|
@ -22,17 +61,6 @@ class IngestShardTask extends BaseTask {
|
|||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
||||
}
|
||||
|
||||
_hydrateEvent(data, stripTimestamp = false) {
|
||||
const event = JSON.parse(data);
|
||||
if (this._stripEventUUID) {
|
||||
delete event.uuid;
|
||||
}
|
||||
if (stripTimestamp) {
|
||||
delete event.timestamp;
|
||||
}
|
||||
return new UtapiMetric(event);
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
const endShard = shardFromTimestamp(timestamp);
|
||||
logger.debug('ingesting shards', { endShard });
|
||||
|
@ -50,43 +78,50 @@ class IngestShardTask extends BaseTask {
|
|||
await async.eachLimit(toIngest, 10,
|
||||
async shard => {
|
||||
if (await this._cache.shardExists(shard)) {
|
||||
const metrics = await this._cache.getMetricsForShard(shard);
|
||||
if (metrics.length > 0) {
|
||||
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
|
||||
const factory = _checkpointFactory(['bucket', 'account']);
|
||||
for await (const metric of this._cache.getMetricsForShard(shard)) {
|
||||
factory.update(metric);
|
||||
}
|
||||
|
||||
const shardAge = now() - shard;
|
||||
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
|
||||
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
|
||||
const metricClass = areSlowEvents ? 'utapi.repair.checkpoint' : 'utapi.checkpoint';
|
||||
|
||||
if (areSlowEvents) {
|
||||
logger.info('Detected slow records, ingesting as repair');
|
||||
}
|
||||
|
||||
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
|
||||
const checkpoints = [];
|
||||
const checkpointTimestamp = areSlowEvents ? now() : shard;
|
||||
|
||||
records.sort((a, b) => a.timestamp - b.timestamp);
|
||||
|
||||
const clock = new InterpolatedClock();
|
||||
records.forEach(r => {
|
||||
r.timestamp = clock.getTs(r.timestamp);
|
||||
Object.entries(factory.checkpoints())
|
||||
.forEach(([level, chkpts]) => {
|
||||
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
|
||||
const data = new UtapiRecord({
|
||||
...checkpoint,
|
||||
timestamp: checkpointTimestamp,
|
||||
});
|
||||
checkpoints.push({
|
||||
className: metricClass,
|
||||
valueType: warp10RecordType,
|
||||
labels: {
|
||||
origin: config.nodeId,
|
||||
[eventFieldsToWarp10[level]]: resource,
|
||||
},
|
||||
data,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
let ingestedIntoNodeId;
|
||||
const status = await this.withWarp10(async warp10 => {
|
||||
// eslint-disable-next-line prefer-destructuring
|
||||
ingestedIntoNodeId = warp10.nodeId;
|
||||
return warp10.ingest(
|
||||
{
|
||||
className: metricClass,
|
||||
labels: { origin: config.nodeId },
|
||||
}, records,
|
||||
);
|
||||
return warp10.ingest(checkpoints);
|
||||
});
|
||||
assert.strictEqual(status, records.length);
|
||||
assert.strictEqual(status, checkpoints.length);
|
||||
await this._cache.deleteShard(shard);
|
||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||
} else {
|
||||
logger.debug('No events found in shard, cleaning up');
|
||||
}
|
||||
} else {
|
||||
logger.warn('shard does not exist', { shard });
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ const {
|
|||
now,
|
||||
convertTimestamp,
|
||||
comprehend,
|
||||
streamToAsyncIter,
|
||||
} = require('../utils');
|
||||
|
||||
const REDIS_CHUNKSIZE = 50;
|
||||
|
@ -54,34 +55,9 @@ class MigrateTask extends BaseTask {
|
|||
return parseInt(value, 10);
|
||||
}
|
||||
|
||||
static async* _iterStream(stream) {
|
||||
let finished = false;
|
||||
let data;
|
||||
|
||||
stream.on('end', () => { finished = true; });
|
||||
stream.pause();
|
||||
while (!finished) {
|
||||
data = await new Promise(resolve => {
|
||||
const _resolve = jsutil.once(resolve);
|
||||
const end = () => _resolve([]);
|
||||
stream.once('end', end);
|
||||
stream.once('data', _data => {
|
||||
stream.pause();
|
||||
stream.off('end', end);
|
||||
_resolve(_data);
|
||||
});
|
||||
stream.resume();
|
||||
});
|
||||
|
||||
for (const item of data) {
|
||||
yield item;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async* _iterResources(level) {
|
||||
const redis = this._redis._redis;
|
||||
const keys = MigrateTask._iterStream(redis.scanStream({
|
||||
const keys = streamToAsyncIter(redis.scanStream({
|
||||
count: 100,
|
||||
match: `s3:${level}:*:storageUtilized`,
|
||||
}));
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { LoggerContext } = require('../utils');
|
||||
const { repairLagSecs, indexedEventFields } = require('../constants');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'Repair',
|
||||
});
|
||||
|
||||
class RepairTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
this._defaultSchedule = config.repairSchedule;
|
||||
this._defaultLag = repairLagSecs;
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
||||
|
||||
const status = await this.withWarp10(warp10 => {
|
||||
const params = {
|
||||
params: {
|
||||
nodeId: warp10.nodeId,
|
||||
end: timestamp.toString(),
|
||||
fields: indexedEventFields,
|
||||
},
|
||||
macro: 'utapi/repairRecords',
|
||||
};
|
||||
return warp10.exec(params);
|
||||
});
|
||||
if (status.result[0]) {
|
||||
logger.info(`created ${status.result[0]} corrections`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = RepairTask;
|
|
@ -1,8 +1,6 @@
|
|||
const BaseTask = require('./BaseTask');
|
||||
const IngestShard = require('./IngestShard');
|
||||
const CreateCheckpoint = require('./CreateCheckpoint');
|
||||
const CreateSnapshot = require('./CreateSnapshot');
|
||||
const RepairTask = require('./Repair');
|
||||
const ReindexTask = require('./Reindex');
|
||||
const MigrateTask = require('./Migrate');
|
||||
const MonitorDiskUsage = require('./DiskUsage');
|
||||
|
@ -11,9 +9,7 @@ const ManualAdjust = require('./ManualAdjust');
|
|||
module.exports = {
|
||||
IngestShard,
|
||||
BaseTask,
|
||||
CreateCheckpoint,
|
||||
CreateSnapshot,
|
||||
RepairTask,
|
||||
ReindexTask,
|
||||
MigrateTask,
|
||||
MonitorDiskUsage,
|
||||
|
|
|
@ -3,6 +3,7 @@ const shard = require('./shard');
|
|||
const timestamp = require('./timestamp');
|
||||
const func = require('./func');
|
||||
const disk = require('./disk');
|
||||
const stream = require('./stream');
|
||||
|
||||
module.exports = {
|
||||
...log,
|
||||
|
@ -10,4 +11,5 @@ module.exports = {
|
|||
...timestamp,
|
||||
...func,
|
||||
...disk,
|
||||
...stream,
|
||||
};
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
const config = require('../config');
|
||||
|
||||
/**
|
||||
* Returns a unix style timestamp floored to 10 second resolution
|
||||
* Returns a unix style timestamp converted to a configurable resolution
|
||||
* Represents a timespan of interval size ending at the returned value.
|
||||
* @param {Number} timestamp - Unix timestamp with millisecond/microsecond resolution
|
||||
* @returns {Number} - Unix timestamp representing beginning of shard
|
||||
*/
|
||||
|
@ -10,7 +11,8 @@ function shardFromTimestamp(timestamp) {
|
|||
if (timestamp > 1000000000000000) { // handle microsecond resolution
|
||||
interval = config.ingestionShardSize * 1000000;
|
||||
}
|
||||
return timestamp - (timestamp % interval);
|
||||
|
||||
return Math.ceil(timestamp / interval) * interval;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
/* eslint-disable no-loop-func */
|
||||
/* eslint-disable no-await-in-loop */
|
||||
|
||||
const { jsutil } = require('arsenal');
|
||||
|
||||
async function* streamToAsyncIter(stream) {
|
||||
let finished = false;
|
||||
let data;
|
||||
|
||||
stream.on('end', () => { finished = true; });
|
||||
stream.pause();
|
||||
while (!finished) {
|
||||
data = await new Promise((resolve, reject) => {
|
||||
const _resolve = jsutil.once(resolve);
|
||||
const _reject = jsutil.once(reject);
|
||||
const end = () => _resolve([]);
|
||||
stream.once('end', end);
|
||||
stream.once('error', _reject);
|
||||
stream.once('data', _data => {
|
||||
stream.pause();
|
||||
stream.off('end', end);
|
||||
stream.off('error', _reject);
|
||||
_resolve(_data);
|
||||
});
|
||||
stream.resume();
|
||||
});
|
||||
|
||||
for (const item of data) {
|
||||
yield item;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
module.exports = {
|
||||
streamToAsyncIter,
|
||||
};
|
|
@ -38,9 +38,12 @@ class Warp10Client {
|
|||
}
|
||||
|
||||
async ingest(metadata, events) {
|
||||
let payload;
|
||||
// If two arguments are provided
|
||||
if (events !== undefined) {
|
||||
const { className, valueType, labels } = metadata;
|
||||
assert.notStrictEqual(className, undefined, 'you must provide a className');
|
||||
const payload = events.map(
|
||||
payload = events.map(
|
||||
ev => this._buildGTSEntry(
|
||||
className,
|
||||
valueType || warp10EventType,
|
||||
|
@ -48,8 +51,20 @@ class Warp10Client {
|
|||
ev,
|
||||
),
|
||||
);
|
||||
// If only one is provided
|
||||
} else {
|
||||
payload = metadata.map(
|
||||
ev => this._buildGTSEntry(
|
||||
ev.className,
|
||||
ev.valueType || warp10EventType,
|
||||
ev.labels || {},
|
||||
ev.data,
|
||||
),
|
||||
);
|
||||
}
|
||||
const res = await this.update(payload);
|
||||
return res.count;
|
||||
|
||||
}
|
||||
|
||||
_buildScriptEntry(params) {
|
||||
|
|
|
@ -71,9 +71,7 @@
|
|||
"start": "node server.js",
|
||||
"test": "mocha --recursive tests/unit",
|
||||
"start_v2:task:ingest": "ENABLE_UTAPI_V2=1 node bin/ingestShards.js",
|
||||
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
|
||||
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
|
||||
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
|
||||
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
|
||||
"start_v2:task:migrate": "ENABLE_UTAPI_V2=1 node bin/migrate.js",
|
||||
"start_v2:task:disk": "ENABLE_UTAPI_V2=1 node bin/diskUsage.js",
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
@utapi/writeCanary
|
|
@ -38,7 +38,7 @@
|
|||
'utapi.snapshot.master' 'master_snapshot_class' STORE
|
||||
'utapi.snapshot' 'snapshot_class' STORE
|
||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||
'utapi.repair.correction' 'correction_class' STORE
|
||||
'utapi.repair.checkpoint' 'correction_class' STORE
|
||||
|
||||
// Fetch latest master snapshot
|
||||
$read_token $master_snapshot_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore
|
||||
|
@ -54,6 +54,7 @@
|
|||
".app"
|
||||
".producer"
|
||||
".owner"
|
||||
"origin"
|
||||
] ->SET 'ignoredLabels' STORE
|
||||
|
||||
// Search for available snapshots
|
|
@ -87,7 +87,7 @@
|
|||
$opsResults SWAP $op PUT DROP
|
||||
%> FOREACH
|
||||
|
||||
$results $opsResults 'operations' PUT
|
||||
$results $opsResults 'operations' PUT DROP
|
||||
$results ->JSON
|
||||
// DUP LOGMSG
|
||||
|
|
@ -35,11 +35,9 @@
|
|||
$operation_info 'node' GET 'nodeID' STORE
|
||||
$operation_info 'no_reindex' GET true == 'no_reindex' STORE
|
||||
|
||||
'utapi.event' 'event_class' STORE
|
||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
|
||||
'utapi.snapshot' 'snapshot_class' STORE
|
||||
'utapi.repair.correction' 'correction_class' STORE
|
||||
'utapi.repair.checkpoint' 'correction_class' STORE
|
||||
'utapi.repair.reindex' 'reindex_class' STORE
|
||||
|
||||
{} 'snapshots' STORE
|
||||
|
@ -128,8 +126,6 @@
|
|||
'end' $endTimestamp
|
||||
'start' $startTimestamp 1 +
|
||||
} FETCH
|
||||
|
||||
|
||||
%> FOREACH
|
||||
|
||||
]
|
||||
|
@ -153,93 +149,6 @@
|
|||
] REDUCE
|
||||
DROP
|
||||
|
||||
// 'results: ' $results ->JSON + LOGMSG
|
||||
|
||||
// 'loading master checkpoints' LOGMSG
|
||||
|
||||
// Load the most recent master checkpoint before our target timestamp from each node
|
||||
{} 'masterCheckpoints' STORE
|
||||
{
|
||||
'token' $read_token
|
||||
'class' $master_checkpoint_class
|
||||
'labels' {}
|
||||
'end' $endTimestamp
|
||||
'count' 1
|
||||
}
|
||||
FETCH
|
||||
<%
|
||||
$masterCheckpoints SWAP
|
||||
DUP LASTTICK SWAP
|
||||
LABELS 'node' GET
|
||||
PUT DROP
|
||||
%> FOREACH
|
||||
|
||||
// 'loaded master checkpoints: '
|
||||
// $masterCheckpoints ->JSON + LOGMSG
|
||||
// $labels ->JSON LOGMSG
|
||||
|
||||
// Find all nodes containing events
|
||||
'load_events' SECTION
|
||||
[
|
||||
$read_token
|
||||
$event_class
|
||||
{}
|
||||
] FINDSETS
|
||||
DROP SWAP DROP
|
||||
'node' GET
|
||||
<% DUP ISNULL %>
|
||||
<% DROP [] %> IFT
|
||||
// Load events from each node
|
||||
<%
|
||||
'key' STORE
|
||||
|
||||
// Get the timestamp of the master checkpoint for the node if it exists
|
||||
// If no master checkpoint exists for a node then we can infer that no
|
||||
// snapshots exists for that node as well, and we must start at 0
|
||||
$masterCheckpoints $key GET
|
||||
<% DUP TYPEOF 'LONG' != %>
|
||||
<% DROP -1 %> IFT
|
||||
'startTimestamp' STORE
|
||||
{
|
||||
'token' $read_token
|
||||
'class' $event_class
|
||||
'labels' { 'node' $key }
|
||||
'end' $endTimestamp
|
||||
'start' $startTimestamp 1 +
|
||||
} FETCH
|
||||
<% // Handle multiple GTS
|
||||
VALUES
|
||||
<%
|
||||
@utapi/decodeEvent 'event' STORE
|
||||
true 'passed' STORE
|
||||
$labels KEYLIST
|
||||
<%
|
||||
'labelKey' STORE
|
||||
<% $labels $labelKey GET $event $labelKey GET != %>
|
||||
<%
|
||||
false 'passed' STORE
|
||||
BREAK
|
||||
%> IFT
|
||||
%> FOREACH
|
||||
|
||||
<% $passed %>
|
||||
<%
|
||||
$results
|
||||
'objD' $event 'objD' 0 @util/getDefault @util/sumField
|
||||
'sizeD' $event 'sizeD' 0 @util/getDefault @util/sumField
|
||||
'inB' $event 'inB' 0 @util/getDefault @util/sumField
|
||||
'outB' $event 'outB' 0 @util/getDefault @util/sumField
|
||||
|
||||
'ops' GET 'resultOps' STORE
|
||||
$resultOps $event 'op' GET 1 @util/sumField
|
||||
DROP
|
||||
%> IFT
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
|
||||
// $results ->JSON LOGMSG
|
||||
|
||||
// Find all nodes containing corrections
|
||||
'load_corrections' SECTION
|
||||
[
|
|
@ -0,0 +1,36 @@
|
|||
{
|
||||
'name' 'utapi/writeCanary'
|
||||
'desc'
|
||||
<'
|
||||
Write a canary record containing the current timestamp labeled with the nodeId
|
||||
'>
|
||||
'sig' [ [ [ ] [ ] ] ] // Signature
|
||||
'params' {}
|
||||
'examples' [
|
||||
<'
|
||||
@utapi/writeCanary
|
||||
'>
|
||||
]
|
||||
} 'info' STORE
|
||||
|
||||
<%
|
||||
!$info INFO
|
||||
SAVE 'context' STORE
|
||||
<%
|
||||
'nodeId' MACROCONFIG 'nodeId' STORE
|
||||
NEWGTS
|
||||
'utapi.canary' RENAME
|
||||
{ 'nodeId' $nodeId } RELABEL
|
||||
NOW NaN NaN NaN true ADDVALUE
|
||||
'writeTokenStatic' UPDATE
|
||||
%>
|
||||
<% // catch any exception
|
||||
RETHROW
|
||||
%>
|
||||
<% // finally, restore the context
|
||||
$context RESTORE
|
||||
%> TRY
|
||||
%>
|
||||
'macro' STORE
|
||||
|
||||
$macro
|
|
@ -1,160 +0,0 @@
|
|||
{
|
||||
'name' 'utapi/createCheckPoint'
|
||||
'desc'
|
||||
<'
|
||||
Scans recent events and generates multiple checkpoint GTS based on the value of the provided fields
|
||||
'>
|
||||
'sig' [ [ [ 'a:MAP' 'o:MAP' ] [ 'c:LIST[GTS]' ] ] ] // Signature
|
||||
'params' {
|
||||
// Signature params description
|
||||
'a' 'Map containing read/write tokens'
|
||||
'o' 'Map containing operation info'
|
||||
'c' 'List of created checkpoints'
|
||||
}
|
||||
'examples' [
|
||||
<'
|
||||
|
||||
'>
|
||||
]
|
||||
} 'info' STORE
|
||||
|
||||
<%
|
||||
!$info INFO
|
||||
SAVE 'context' STORE
|
||||
<%
|
||||
// 'Creating checkpoints' LOGMSG
|
||||
JSON-> 'operation_info' STORE
|
||||
JSON-> 'auth_info' STORE
|
||||
|
||||
$auth_info 'write' GET 'write_token' STORE
|
||||
$auth_info 'read' GET 'read_token' STORE
|
||||
|
||||
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
|
||||
$operation_info 'nodeId' GET 'nodeId' STORE
|
||||
{ 'node' $nodeId } 'filterLabels' STORE
|
||||
|
||||
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
|
||||
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
|
||||
|
||||
// Grab our passed timestamp and store it as the variable `endTimestamp`
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
|
||||
'utapi.checkpoint.master' 'master_checkpoint_class' STORE
|
||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||
'utapi.event' 'metric_class' STORE
|
||||
|
||||
$read_token $master_checkpoint_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master checkpoint
|
||||
FIRSTTICK
|
||||
// If we found a checkpoint, increment its timestamp
|
||||
<% DUP 0 > %>
|
||||
<% 1 + %> IFT
|
||||
'startTimestamp' STORE // Grab our starting timestamp from the last checkpoint (0 if no checkpoints)
|
||||
|
||||
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
||||
// 'Using ' $endTimestamp TOSTRING + ' as endTimestamp' + LOGMSG
|
||||
{} 'results' STORE # Create an empty map for results
|
||||
|
||||
$fieldsToIndex
|
||||
<% // For each field create an empty map in results
|
||||
'field' STORE
|
||||
$results {} $field PUT DROP
|
||||
%> FOREACH
|
||||
|
||||
// Fetch all events since the last checkpoint
|
||||
{
|
||||
'token' $read_token
|
||||
'class' $metric_class
|
||||
'labels' $filterLabels
|
||||
'start' $startTimestamp
|
||||
'end' $endTimestamp
|
||||
} FETCH
|
||||
<% DUP SIZE 0 > %>
|
||||
<%
|
||||
<%
|
||||
<% // Iter over events
|
||||
'event' STORE // Store the event
|
||||
$event 4 GET @utapi/decodeEvent 'decoded' STORE
|
||||
// 'Including event ' $event 0 GET TOSTRING + ' ' + $decoded ->JSON + LOGMSG
|
||||
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
|
||||
$decoded 'op' GET 'operationId' STORE
|
||||
|
||||
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
|
||||
<%
|
||||
'field' STORE // Store the field
|
||||
$decoded $field GET 'fieldValue' STORE // Store the fields value
|
||||
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
|
||||
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
|
||||
<%
|
||||
$fieldResults $fieldValue GET // Grab the existing map for the checkpoint and leave it on the stack
|
||||
%>
|
||||
<%
|
||||
// Push empty checkpoint onto stack
|
||||
{
|
||||
'objD' 0
|
||||
'sizeD' 0
|
||||
'inB' 0
|
||||
'outB' 0
|
||||
'ops' {}
|
||||
} 'fieldValueResults' STORE
|
||||
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
|
||||
$fieldValueResults // Leave it on the stack
|
||||
%> IFTE
|
||||
// Consumes a map off the stack summing the specified field and the passed value
|
||||
// Leaves the modified map on the stack
|
||||
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
|
||||
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
|
||||
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
|
||||
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
|
||||
|
||||
// Grab our op count map
|
||||
'ops' GET 'opsCount' STORE
|
||||
$opsCount $operationId 1 @util/sumField
|
||||
DROP // Drop the returned map from sumField
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
%>
|
||||
<%
|
||||
DROP 0 STOP
|
||||
%> IFTE
|
||||
|
||||
0 'checkpoints' STORE
|
||||
// For each of our indexed fields
|
||||
$results KEYLIST
|
||||
<%
|
||||
'field' STORE
|
||||
$results $field GET 'fieldResults' STORE
|
||||
// For each unique value seen
|
||||
$fieldResults KEYLIST
|
||||
<%
|
||||
'fieldValue' STORE
|
||||
// Encode the results
|
||||
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
|
||||
// 'Created checkpoint ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
|
||||
// And create our GTS
|
||||
NEWGTS $checkpoint_class RENAME
|
||||
$endTimestamp NaN NaN NaN $value ADDVALUE
|
||||
{ 'node' $nodeId $field $fieldValue } RELABEL
|
||||
$write_token UPDATE
|
||||
$checkpoints 1 + 'checkpoints' STORE
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
|
||||
NEWGTS $master_checkpoint_class RENAME
|
||||
$endTimestamp NaN NaN NaN 0 ADDVALUE
|
||||
{ 'node' $nodeId } RELABEL
|
||||
$write_token UPDATE
|
||||
|
||||
$checkpoints // Leave the number of created checkpoints on the stack
|
||||
|
||||
%>
|
||||
<% // catch any exception
|
||||
RETHROW
|
||||
%>
|
||||
<% // finally, restore the context
|
||||
$context RESTORE
|
||||
%> TRY
|
||||
%>
|
||||
'macro' STORE
|
||||
|
||||
$macro
|
|
@ -1,157 +0,0 @@
|
|||
{
|
||||
'name' 'utapi/repairRecord'
|
||||
'desc'
|
||||
<'
|
||||
|
||||
'>
|
||||
'sig' [ [ [ ] [ ] ] ] // Signature
|
||||
'params' {
|
||||
// Signature params description
|
||||
}
|
||||
'examples' [
|
||||
<'
|
||||
|
||||
'>
|
||||
]
|
||||
} 'info' STORE
|
||||
|
||||
<%
|
||||
!$info INFO
|
||||
SAVE 'context' STORE
|
||||
<%
|
||||
// 'Checking for repairs' LOGMSG
|
||||
JSON-> 'operation_info' STORE
|
||||
JSON-> 'auth_info' STORE
|
||||
|
||||
$auth_info 'write' GET 'write_token' STORE
|
||||
$auth_info 'read' GET 'read_token' STORE
|
||||
|
||||
// Grab our passed nodeId, wrap in a map and store it as the variable `filterLabels`
|
||||
$operation_info 'nodeId' GET 'nodeId' STORE
|
||||
{ 'node' $nodeId } 'filterLabels' STORE
|
||||
|
||||
// Grab our passed field names, convert them to a set, and store it as the variable `fieldsToIndex`
|
||||
$operation_info 'fields' GET ->SET 'fieldsToIndex' STORE
|
||||
|
||||
// Grab our passed timestamp and store it as the variable `endTimestamp`
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
|
||||
'utapi.repair.master' 'master_repair_class' STORE
|
||||
'utapi.repair.correction' 'correction_class' STORE
|
||||
'utapi.repair.event' 'metric_class' STORE
|
||||
|
||||
$read_token $master_repair_class $filterLabels $endTimestamp @utapi/fetchFirstRecordBefore // Fetch latest master repair
|
||||
FIRSTTICK
|
||||
// If we found a repair, increment its timestamp so we start at the tick immediatly after
|
||||
<% DUP 0 > %>
|
||||
<% 1 + %> IFT
|
||||
'startTimestamp' STORE // Grab our starting timestamp from the last repair (0 if no repairs)
|
||||
// 'Using ' $startTimestamp TOSTRING + ' as startTimestamp' + LOGMSG
|
||||
|
||||
{} 'results' STORE # Create an empty map for results
|
||||
|
||||
$fieldsToIndex
|
||||
<% // For each field create an empty map in results
|
||||
'field' STORE
|
||||
$results {} $field PUT DROP
|
||||
%> FOREACH
|
||||
|
||||
// Fetch all events since the last repair
|
||||
{
|
||||
'token' $read_token
|
||||
'class' $metric_class
|
||||
'labels' $filterLabels
|
||||
'start' $startTimestamp
|
||||
'end' $endTimestamp
|
||||
} FETCH
|
||||
// STOP
|
||||
<% DUP SIZE 0 > %>
|
||||
<%
|
||||
0 GET
|
||||
<%
|
||||
'event' STORE
|
||||
$event 4 GET @utapi/decodeEvent 'decoded' STORE
|
||||
// 'Including event ' $decoded 'id' GET + LOGMSG
|
||||
$decoded KEYLIST 'eventFields' STORE // Extract and store available event fields
|
||||
$decoded 'op' GET 'operationId' STORE
|
||||
|
||||
$decoded KEYLIST ->SET $fieldsToIndex INTERSECTION
|
||||
<%
|
||||
'field' STORE // Store the field
|
||||
$decoded $field GET 'fieldValue' STORE // Store the fields value
|
||||
$results $field GET 'fieldResults' STORE // Grad the corresponding field map from the results
|
||||
<% $fieldResults $fieldValue CONTAINSKEY SWAP DROP %> // If we've see this fieldValue before
|
||||
<%
|
||||
$fieldResults $fieldValue GET // Grab the existing map for the correction and leave it on the stack
|
||||
%>
|
||||
<%
|
||||
// Push empty correction onto stack
|
||||
{
|
||||
'objD' 0
|
||||
'sizeD' 0
|
||||
'inB' 0
|
||||
'outB' 0
|
||||
'ops' {}
|
||||
} 'fieldValueResults' STORE
|
||||
$fieldResults $fieldValueResults $fieldValue PUT DROP // Add it to our results
|
||||
$fieldValueResults // Leave it on the stack
|
||||
%> IFTE
|
||||
// Consumes a map off the stack summing the specified field and the passed value
|
||||
// Leaves the modified map on the stack
|
||||
'objD' $decoded 'objD' 0 @util/getDefault @util/sumField
|
||||
'sizeD' $decoded 'sizeD' 0 @util/getDefault @util/sumField
|
||||
'inB' $decoded 'inB' 0 @util/getDefault @util/sumField
|
||||
'outB' $decoded 'outB' 0 @util/getDefault @util/sumField
|
||||
|
||||
// Grab our op count map
|
||||
'ops' GET 'opsCount' STORE
|
||||
$opsCount $operationId 1 @util/sumField
|
||||
DROP // Drop the returned map from sumField
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
%>
|
||||
<%
|
||||
DROP 0 STOP
|
||||
%> IFTE
|
||||
|
||||
0 'corrections' STORE
|
||||
$results KEYLIST
|
||||
<%
|
||||
'field' STORE
|
||||
$results $field GET 'fieldResults' STORE
|
||||
// For each unique value seen
|
||||
$fieldResults KEYLIST
|
||||
<%
|
||||
'fieldValue' STORE
|
||||
// Encode the results
|
||||
$fieldResults $fieldValue GET @utapi/encodeRecord 'value' STORE
|
||||
// 'Created correction ' { 'node' $nodeId $field $fieldValue } ->JSON + ' ' + $fieldResults $fieldValue GET ->JSON + LOGMSG
|
||||
// And create our GTS
|
||||
NEWGTS $correction_class RENAME
|
||||
$endTimestamp NaN NaN NaN $value ADDVALUE
|
||||
{ 'node' $nodeId $field $fieldValue } RELABEL
|
||||
$write_token UPDATE
|
||||
$corrections 1 + 'corrections' STORE
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
|
||||
NEWGTS $master_repair_class RENAME
|
||||
$endTimestamp NaN NaN NaN 0 ADDVALUE
|
||||
{ 'node' $nodeId } RELABEL
|
||||
$write_token UPDATE
|
||||
|
||||
$corrections // Leave the number of created corrections on the stack
|
||||
|
||||
%>
|
||||
<% // catch any exception
|
||||
RETHROW
|
||||
%>
|
||||
<% // finally, restore the context
|
||||
$context RESTORE
|
||||
%> TRY
|
||||
%>
|
||||
'macro' STORE
|
||||
|
||||
// Unit tests
|
||||
|
||||
$macro
|
Loading…
Reference in New Issue