Compare commits

...

2 Commits

Author SHA1 Message Date
Taylor McKinnon 38e6e6660f add working version 2021-07-09 11:15:52 -07:00
Taylor McKinnon b59487da8f stash 2021-06-25 10:05:02 -07:00
3 changed files with 133 additions and 14 deletions

View File

@ -1,12 +1,37 @@
const BaseTask = require('./BaseTask');
const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants');
const { LoggerContext } = require('../utils');
const { LoggerContext, comprehend } = require('../utils');
const logger = new LoggerContext({
module: 'CreateCheckpoint',
});
function _updateCheckpoint(checkpoint, metric) {
return {
objD: checkpoint.objD + metric.objD,
sizeD: checkpoint.sizeD + metric.sizeD,
inB: checkpoint.inB + metric.inB,
outB: checkpoint.outB + metric.outB,
};
}
function _checkpointFactory(labels) {
const checkpoints = comprehend(labels, key => ({ key, value: {} }));
return {
update: metric => {
labels
.filter(label => !!metric[label])
.forEach(label => {
const value = metric[label];
const checkpoint = checkpoints[label][value];
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
});
},
checkpoints: () => (checkpoints),
};
}
class CreateCheckpoint extends BaseTask {
constructor(options) {
super(options);

View File

@ -1,12 +1,16 @@
/* eslint-disable no-restricted-globals */
const assert = require('assert');
const async = require('async');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const { UtapiRecord } = require('../models');
const config = require('../config');
const { checkpointLagSecs } = require('../constants');
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
} = require('../utils');
const { warp10 } = require('../config');
const logger = new LoggerContext({
module: 'IngestShard',
@ -14,6 +18,43 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
function orZero(value) {
return value || 0;
}
function _updateCheckpoint(checkpoint, metric) {
const ops = checkpoint.operations || {};
return {
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
};
}
function _checkpointFactory(labels) {
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
let oldest = NaN;
let newest = NaN;
return {
update: metric => {
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
labels
.filter(label => !!metric[label])
.forEach(label => {
const value = metric[label];
const checkpoint = checkpoints[label][value] || {};
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
});
},
checkpoints: () => (checkpoints),
oldest: () => (oldest),
newest: () => (newest),
};
}
class IngestShardTask extends BaseTask {
constructor(options) {
super(options);
@ -54,21 +95,69 @@ class IngestShardTask extends BaseTask {
if (metrics.length > 0) {
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
const shardAge = now() - shard;
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
const areSlowEvents = false; //shardAge >= checkpointLagMicroseconds;
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
if (areSlowEvents) {
logger.info('Detected slow records, ingesting as repair');
}
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
const factory = _checkpointFactory(['bucket', 'account']);
records.sort((a, b) => a.timestamp - b.timestamp);
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents)).forEach(factory.update);
// console.log(JSON.stringi?fy(factory.checkpoints(), null, 4));
console.log(factory.newest());
// records.sort((a, b) => a.timestamp - b.timestamp);
// const clock = new InterpolatedClock();
// records.forEach(r => {
// r.timestamp = clock.getTs(r.timestamp);
// });
const checkpointTimestamp = factory.newest();
const checkpoints = [];
Object.entries(factory.checkpoints())
.forEach(([level, chkpts]) => {
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
const record = new UtapiRecord({
...checkpoint,
timestamp: checkpointTimestamp,
});
checkpoints.push({
level,
resource,
data: new UtapiRecord({
...checkpoint,
timestamp: checkpointTimestamp,
}),
});
});
});
await async.mapLimit(checkpoints, 10,
async checkpoint => {
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(
{
className: 'utapi.checkpoint',
labels: {
origin: config.nodeId,
[eventFieldsToWarp10[checkpoint.level]]: checkpoint.resource,
},
valueType: warp10RecordType,
}, [checkpoint.data],
);
});
// logger.info(
// `ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`,
// );
});
const clock = new InterpolatedClock();
records.forEach(r => {
r.timestamp = clock.getTs(r.timestamp);
});
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
@ -76,14 +165,18 @@ class IngestShardTask extends BaseTask {
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(
{
className: metricClass,
className: 'utapi.checkpoint.master',
labels: { origin: config.nodeId },
}, records,
valueType: warp10RecordType,
}, [new UtapiRecord({
timestamp: checkpointTimestamp,
})],
);
});
assert.strictEqual(status, records.length);
// assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
// logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
} else {
logger.debug('No events found in shard, cleaning up');
}

View File

@ -54,6 +54,7 @@
".app"
".producer"
".owner"
"origin"
] ->SET 'ignoredLabels' STORE
// Search for available snapshots