Compare commits

...

2 Commits

Author SHA1 Message Date
Taylor McKinnon 38e6e6660f add working version 2021-07-09 11:15:52 -07:00
Taylor McKinnon b59487da8f stash 2021-06-25 10:05:02 -07:00
3 changed files with 133 additions and 14 deletions

View File

@ -1,12 +1,37 @@
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const config = require('../config'); const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants'); const { checkpointLagSecs, indexedEventFields } = require('../constants');
const { LoggerContext } = require('../utils'); const { LoggerContext, comprehend } = require('../utils');
const logger = new LoggerContext({ const logger = new LoggerContext({
module: 'CreateCheckpoint', module: 'CreateCheckpoint',
}); });
function _updateCheckpoint(checkpoint, metric) {
return {
objD: checkpoint.objD + metric.objD,
sizeD: checkpoint.sizeD + metric.sizeD,
inB: checkpoint.inB + metric.inB,
outB: checkpoint.outB + metric.outB,
};
}
function _checkpointFactory(labels) {
const checkpoints = comprehend(labels, key => ({ key, value: {} }));
return {
update: metric => {
labels
.filter(label => !!metric[label])
.forEach(label => {
const value = metric[label];
const checkpoint = checkpoints[label][value];
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
});
},
checkpoints: () => (checkpoints),
};
}
class CreateCheckpoint extends BaseTask { class CreateCheckpoint extends BaseTask {
constructor(options) { constructor(options) {
super(options); super(options);

View File

@ -1,12 +1,16 @@
/* eslint-disable no-restricted-globals */
const assert = require('assert'); const assert = require('assert');
const async = require('async'); const async = require('async');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models'); const { UtapiMetric } = require('../models');
const { UtapiRecord } = require('../models');
const config = require('../config'); const config = require('../config');
const { checkpointLagSecs } = require('../constants'); const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
const { const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
} = require('../utils'); } = require('../utils');
const { warp10 } = require('../config');
const logger = new LoggerContext({ const logger = new LoggerContext({
module: 'IngestShard', module: 'IngestShard',
@ -14,6 +18,43 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs); const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
function orZero(value) {
return value || 0;
}
function _updateCheckpoint(checkpoint, metric) {
const ops = checkpoint.operations || {};
return {
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
};
}
function _checkpointFactory(labels) {
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
let oldest = NaN;
let newest = NaN;
return {
update: metric => {
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
labels
.filter(label => !!metric[label])
.forEach(label => {
const value = metric[label];
const checkpoint = checkpoints[label][value] || {};
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
});
},
checkpoints: () => (checkpoints),
oldest: () => (oldest),
newest: () => (newest),
};
}
class IngestShardTask extends BaseTask { class IngestShardTask extends BaseTask {
constructor(options) { constructor(options) {
super(options); super(options);
@ -54,36 +95,88 @@ class IngestShardTask extends BaseTask {
if (metrics.length > 0) { if (metrics.length > 0) {
logger.info(`Ingesting ${metrics.length} events from shard`, { shard }); logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
const shardAge = now() - shard; const shardAge = now() - shard;
const areSlowEvents = shardAge >= checkpointLagMicroseconds; const areSlowEvents = false; //shardAge >= checkpointLagMicroseconds;
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event'; const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
if (areSlowEvents) { if (areSlowEvents) {
logger.info('Detected slow records, ingesting as repair'); logger.info('Detected slow records, ingesting as repair');
} }
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents)); const factory = _checkpointFactory(['bucket', 'account']);
records.sort((a, b) => a.timestamp - b.timestamp); const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents)).forEach(factory.update);
const clock = new InterpolatedClock(); // console.log(JSON.stringi?fy(factory.checkpoints(), null, 4));
records.forEach(r => { console.log(factory.newest());
r.timestamp = clock.getTs(r.timestamp); // records.sort((a, b) => a.timestamp - b.timestamp);
// const clock = new InterpolatedClock();
// records.forEach(r => {
// r.timestamp = clock.getTs(r.timestamp);
// });
const checkpointTimestamp = factory.newest();
const checkpoints = [];
Object.entries(factory.checkpoints())
.forEach(([level, chkpts]) => {
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
const record = new UtapiRecord({
...checkpoint,
timestamp: checkpointTimestamp,
}); });
checkpoints.push({
level,
resource,
data: new UtapiRecord({
...checkpoint,
timestamp: checkpointTimestamp,
}),
});
});
});
await async.mapLimit(checkpoints, 10,
async checkpoint => {
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(
{
className: 'utapi.checkpoint',
labels: {
origin: config.nodeId,
[eventFieldsToWarp10[checkpoint.level]]: checkpoint.resource,
},
valueType: warp10RecordType,
}, [checkpoint.data],
);
});
// logger.info(
// `ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`,
// );
});
let ingestedIntoNodeId; let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => { const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring // eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId; ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest( return warp10.ingest(
{ {
className: metricClass, className: 'utapi.checkpoint.master',
labels: { origin: config.nodeId }, labels: { origin: config.nodeId },
}, records, valueType: warp10RecordType,
}, [new UtapiRecord({
timestamp: checkpointTimestamp,
})],
); );
}); });
assert.strictEqual(status, records.length); // assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard); await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`); // logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
} else { } else {
logger.debug('No events found in shard, cleaning up'); logger.debug('No events found in shard, cleaning up');
} }

View File

@ -54,6 +54,7 @@
".app" ".app"
".producer" ".producer"
".owner" ".owner"
"origin"
] ->SET 'ignoredLabels' STORE ] ->SET 'ignoredLabels' STORE
// Search for available snapshots // Search for available snapshots