Compare commits
2 Commits
13351ddcd8
...
38e6e6660f
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 38e6e6660f | |
Taylor McKinnon | b59487da8f |
|
@ -1,12 +1,37 @@
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
||||||
const { LoggerContext } = require('../utils');
|
const { LoggerContext, comprehend } = require('../utils');
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
const logger = new LoggerContext({
|
||||||
module: 'CreateCheckpoint',
|
module: 'CreateCheckpoint',
|
||||||
});
|
});
|
||||||
|
|
||||||
|
function _updateCheckpoint(checkpoint, metric) {
|
||||||
|
return {
|
||||||
|
objD: checkpoint.objD + metric.objD,
|
||||||
|
sizeD: checkpoint.sizeD + metric.sizeD,
|
||||||
|
inB: checkpoint.inB + metric.inB,
|
||||||
|
outB: checkpoint.outB + metric.outB,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function _checkpointFactory(labels) {
|
||||||
|
const checkpoints = comprehend(labels, key => ({ key, value: {} }));
|
||||||
|
return {
|
||||||
|
update: metric => {
|
||||||
|
labels
|
||||||
|
.filter(label => !!metric[label])
|
||||||
|
.forEach(label => {
|
||||||
|
const value = metric[label];
|
||||||
|
const checkpoint = checkpoints[label][value];
|
||||||
|
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
checkpoints: () => (checkpoints),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
class CreateCheckpoint extends BaseTask {
|
class CreateCheckpoint extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super(options);
|
||||||
|
|
|
@ -1,12 +1,16 @@
|
||||||
|
/* eslint-disable no-restricted-globals */
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const { UtapiMetric } = require('../models');
|
const { UtapiMetric } = require('../models');
|
||||||
|
const { UtapiRecord } = require('../models');
|
||||||
|
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { checkpointLagSecs } = require('../constants');
|
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
|
||||||
const {
|
const {
|
||||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
|
||||||
} = require('../utils');
|
} = require('../utils');
|
||||||
|
const { warp10 } = require('../config');
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
const logger = new LoggerContext({
|
||||||
module: 'IngestShard',
|
module: 'IngestShard',
|
||||||
|
@ -14,6 +18,43 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||||
|
|
||||||
|
function orZero(value) {
|
||||||
|
return value || 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
function _updateCheckpoint(checkpoint, metric) {
|
||||||
|
const ops = checkpoint.operations || {};
|
||||||
|
return {
|
||||||
|
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
|
||||||
|
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
|
||||||
|
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
|
||||||
|
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
|
||||||
|
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function _checkpointFactory(labels) {
|
||||||
|
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
|
||||||
|
let oldest = NaN;
|
||||||
|
let newest = NaN;
|
||||||
|
return {
|
||||||
|
update: metric => {
|
||||||
|
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
|
||||||
|
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
|
||||||
|
labels
|
||||||
|
.filter(label => !!metric[label])
|
||||||
|
.forEach(label => {
|
||||||
|
const value = metric[label];
|
||||||
|
const checkpoint = checkpoints[label][value] || {};
|
||||||
|
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
checkpoints: () => (checkpoints),
|
||||||
|
oldest: () => (oldest),
|
||||||
|
newest: () => (newest),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
class IngestShardTask extends BaseTask {
|
class IngestShardTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super(options);
|
||||||
|
@ -54,36 +95,88 @@ class IngestShardTask extends BaseTask {
|
||||||
if (metrics.length > 0) {
|
if (metrics.length > 0) {
|
||||||
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
|
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
|
||||||
const shardAge = now() - shard;
|
const shardAge = now() - shard;
|
||||||
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
|
const areSlowEvents = false; //shardAge >= checkpointLagMicroseconds;
|
||||||
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
|
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
|
||||||
|
|
||||||
if (areSlowEvents) {
|
if (areSlowEvents) {
|
||||||
logger.info('Detected slow records, ingesting as repair');
|
logger.info('Detected slow records, ingesting as repair');
|
||||||
}
|
}
|
||||||
|
|
||||||
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
|
const factory = _checkpointFactory(['bucket', 'account']);
|
||||||
|
|
||||||
records.sort((a, b) => a.timestamp - b.timestamp);
|
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents)).forEach(factory.update);
|
||||||
|
|
||||||
const clock = new InterpolatedClock();
|
// console.log(JSON.stringi?fy(factory.checkpoints(), null, 4));
|
||||||
records.forEach(r => {
|
console.log(factory.newest());
|
||||||
r.timestamp = clock.getTs(r.timestamp);
|
// records.sort((a, b) => a.timestamp - b.timestamp);
|
||||||
|
|
||||||
|
// const clock = new InterpolatedClock();
|
||||||
|
// records.forEach(r => {
|
||||||
|
// r.timestamp = clock.getTs(r.timestamp);
|
||||||
|
// });
|
||||||
|
|
||||||
|
const checkpointTimestamp = factory.newest();
|
||||||
|
const checkpoints = [];
|
||||||
|
Object.entries(factory.checkpoints())
|
||||||
|
.forEach(([level, chkpts]) => {
|
||||||
|
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
|
||||||
|
const record = new UtapiRecord({
|
||||||
|
...checkpoint,
|
||||||
|
timestamp: checkpointTimestamp,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
checkpoints.push({
|
||||||
|
level,
|
||||||
|
resource,
|
||||||
|
data: new UtapiRecord({
|
||||||
|
...checkpoint,
|
||||||
|
timestamp: checkpointTimestamp,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
await async.mapLimit(checkpoints, 10,
|
||||||
|
async checkpoint => {
|
||||||
|
let ingestedIntoNodeId;
|
||||||
|
const status = await this.withWarp10(async warp10 => {
|
||||||
|
// eslint-disable-next-line prefer-destructuring
|
||||||
|
ingestedIntoNodeId = warp10.nodeId;
|
||||||
|
return warp10.ingest(
|
||||||
|
{
|
||||||
|
className: 'utapi.checkpoint',
|
||||||
|
labels: {
|
||||||
|
origin: config.nodeId,
|
||||||
|
[eventFieldsToWarp10[checkpoint.level]]: checkpoint.resource,
|
||||||
|
},
|
||||||
|
valueType: warp10RecordType,
|
||||||
|
}, [checkpoint.data],
|
||||||
|
);
|
||||||
|
});
|
||||||
|
// logger.info(
|
||||||
|
// `ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`,
|
||||||
|
// );
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
let ingestedIntoNodeId;
|
let ingestedIntoNodeId;
|
||||||
const status = await this.withWarp10(async warp10 => {
|
const status = await this.withWarp10(async warp10 => {
|
||||||
// eslint-disable-next-line prefer-destructuring
|
// eslint-disable-next-line prefer-destructuring
|
||||||
ingestedIntoNodeId = warp10.nodeId;
|
ingestedIntoNodeId = warp10.nodeId;
|
||||||
return warp10.ingest(
|
return warp10.ingest(
|
||||||
{
|
{
|
||||||
className: metricClass,
|
className: 'utapi.checkpoint.master',
|
||||||
labels: { origin: config.nodeId },
|
labels: { origin: config.nodeId },
|
||||||
}, records,
|
valueType: warp10RecordType,
|
||||||
|
|
||||||
|
}, [new UtapiRecord({
|
||||||
|
timestamp: checkpointTimestamp,
|
||||||
|
})],
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
assert.strictEqual(status, records.length);
|
// assert.strictEqual(status, records.length);
|
||||||
await this._cache.deleteShard(shard);
|
await this._cache.deleteShard(shard);
|
||||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
// logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||||
} else {
|
} else {
|
||||||
logger.debug('No events found in shard, cleaning up');
|
logger.debug('No events found in shard, cleaning up');
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,7 @@
|
||||||
".app"
|
".app"
|
||||||
".producer"
|
".producer"
|
||||||
".owner"
|
".owner"
|
||||||
|
"origin"
|
||||||
] ->SET 'ignoredLabels' STORE
|
] ->SET 'ignoredLabels' STORE
|
||||||
|
|
||||||
// Search for available snapshots
|
// Search for available snapshots
|
||||||
|
|
Loading…
Reference in New Issue