Compare commits
2 Commits
13351ddcd8
...
38e6e6660f
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 38e6e6660f | |
Taylor McKinnon | b59487da8f |
|
@ -1,12 +1,37 @@
|
|||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
||||
const { LoggerContext } = require('../utils');
|
||||
const { LoggerContext, comprehend } = require('../utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'CreateCheckpoint',
|
||||
});
|
||||
|
||||
function _updateCheckpoint(checkpoint, metric) {
|
||||
return {
|
||||
objD: checkpoint.objD + metric.objD,
|
||||
sizeD: checkpoint.sizeD + metric.sizeD,
|
||||
inB: checkpoint.inB + metric.inB,
|
||||
outB: checkpoint.outB + metric.outB,
|
||||
};
|
||||
}
|
||||
|
||||
function _checkpointFactory(labels) {
|
||||
const checkpoints = comprehend(labels, key => ({ key, value: {} }));
|
||||
return {
|
||||
update: metric => {
|
||||
labels
|
||||
.filter(label => !!metric[label])
|
||||
.forEach(label => {
|
||||
const value = metric[label];
|
||||
const checkpoint = checkpoints[label][value];
|
||||
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
|
||||
});
|
||||
},
|
||||
checkpoints: () => (checkpoints),
|
||||
};
|
||||
}
|
||||
|
||||
class CreateCheckpoint extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
/* eslint-disable no-restricted-globals */
|
||||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const { UtapiMetric } = require('../models');
|
||||
const { UtapiRecord } = require('../models');
|
||||
|
||||
const config = require('../config');
|
||||
const { checkpointLagSecs } = require('../constants');
|
||||
const { checkpointLagSecs, warp10RecordType, eventFieldsToWarp10 } = require('../constants');
|
||||
const {
|
||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, comprehend,
|
||||
} = require('../utils');
|
||||
const { warp10 } = require('../config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'IngestShard',
|
||||
|
@ -14,6 +18,43 @@ const logger = new LoggerContext({
|
|||
|
||||
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||
|
||||
function orZero(value) {
|
||||
return value || 0;
|
||||
}
|
||||
|
||||
function _updateCheckpoint(checkpoint, metric) {
|
||||
const ops = checkpoint.operations || {};
|
||||
return {
|
||||
objectDelta: orZero(checkpoint.objectDelta) + orZero(metric.objectDelta),
|
||||
sizeDelta: orZero(checkpoint.sizeDelta) + orZero(metric.sizeDelta),
|
||||
incomingBytes: orZero(checkpoint.incomingBytes) + orZero(metric.incomingBytes),
|
||||
outgoingBytes: orZero(checkpoint.outgoingBytes) + orZero(metric.outgoingBytes),
|
||||
operations: { ...ops, [metric.operationId]: (ops[metric.operationId] || 0) + 1 },
|
||||
};
|
||||
}
|
||||
|
||||
function _checkpointFactory(labels) {
|
||||
const checkpoints = comprehend(labels, (_, key) => ({ key, value: {} }));
|
||||
let oldest = NaN;
|
||||
let newest = NaN;
|
||||
return {
|
||||
update: metric => {
|
||||
oldest = metric.timestamp < oldest || isNaN(oldest) ? metric.timestamp : oldest;
|
||||
newest = metric.timestamp > newest || isNaN(newest) ? metric.timestamp : newest;
|
||||
labels
|
||||
.filter(label => !!metric[label])
|
||||
.forEach(label => {
|
||||
const value = metric[label];
|
||||
const checkpoint = checkpoints[label][value] || {};
|
||||
checkpoints[label][value] = _updateCheckpoint(checkpoint, metric);
|
||||
});
|
||||
},
|
||||
checkpoints: () => (checkpoints),
|
||||
oldest: () => (oldest),
|
||||
newest: () => (newest),
|
||||
};
|
||||
}
|
||||
|
||||
class IngestShardTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
|
@ -54,36 +95,88 @@ class IngestShardTask extends BaseTask {
|
|||
if (metrics.length > 0) {
|
||||
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
|
||||
const shardAge = now() - shard;
|
||||
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
|
||||
const areSlowEvents = false; //shardAge >= checkpointLagMicroseconds;
|
||||
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
|
||||
|
||||
if (areSlowEvents) {
|
||||
logger.info('Detected slow records, ingesting as repair');
|
||||
}
|
||||
|
||||
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
|
||||
const factory = _checkpointFactory(['bucket', 'account']);
|
||||
|
||||
records.sort((a, b) => a.timestamp - b.timestamp);
|
||||
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents)).forEach(factory.update);
|
||||
|
||||
const clock = new InterpolatedClock();
|
||||
records.forEach(r => {
|
||||
r.timestamp = clock.getTs(r.timestamp);
|
||||
// console.log(JSON.stringi?fy(factory.checkpoints(), null, 4));
|
||||
console.log(factory.newest());
|
||||
// records.sort((a, b) => a.timestamp - b.timestamp);
|
||||
|
||||
// const clock = new InterpolatedClock();
|
||||
// records.forEach(r => {
|
||||
// r.timestamp = clock.getTs(r.timestamp);
|
||||
// });
|
||||
|
||||
const checkpointTimestamp = factory.newest();
|
||||
const checkpoints = [];
|
||||
Object.entries(factory.checkpoints())
|
||||
.forEach(([level, chkpts]) => {
|
||||
Object.entries(chkpts).forEach(([resource, checkpoint]) => {
|
||||
const record = new UtapiRecord({
|
||||
...checkpoint,
|
||||
timestamp: checkpointTimestamp,
|
||||
});
|
||||
|
||||
checkpoints.push({
|
||||
level,
|
||||
resource,
|
||||
data: new UtapiRecord({
|
||||
...checkpoint,
|
||||
timestamp: checkpointTimestamp,
|
||||
}),
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
await async.mapLimit(checkpoints, 10,
|
||||
async checkpoint => {
|
||||
let ingestedIntoNodeId;
|
||||
const status = await this.withWarp10(async warp10 => {
|
||||
// eslint-disable-next-line prefer-destructuring
|
||||
ingestedIntoNodeId = warp10.nodeId;
|
||||
return warp10.ingest(
|
||||
{
|
||||
className: 'utapi.checkpoint',
|
||||
labels: {
|
||||
origin: config.nodeId,
|
||||
[eventFieldsToWarp10[checkpoint.level]]: checkpoint.resource,
|
||||
},
|
||||
valueType: warp10RecordType,
|
||||
}, [checkpoint.data],
|
||||
);
|
||||
});
|
||||
// logger.info(
|
||||
// `ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`,
|
||||
// );
|
||||
});
|
||||
|
||||
|
||||
let ingestedIntoNodeId;
|
||||
const status = await this.withWarp10(async warp10 => {
|
||||
// eslint-disable-next-line prefer-destructuring
|
||||
ingestedIntoNodeId = warp10.nodeId;
|
||||
return warp10.ingest(
|
||||
{
|
||||
className: metricClass,
|
||||
className: 'utapi.checkpoint.master',
|
||||
labels: { origin: config.nodeId },
|
||||
}, records,
|
||||
valueType: warp10RecordType,
|
||||
|
||||
}, [new UtapiRecord({
|
||||
timestamp: checkpointTimestamp,
|
||||
})],
|
||||
);
|
||||
});
|
||||
assert.strictEqual(status, records.length);
|
||||
// assert.strictEqual(status, records.length);
|
||||
await this._cache.deleteShard(shard);
|
||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||
// logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||
} else {
|
||||
logger.debug('No events found in shard, cleaning up');
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@
|
|||
".app"
|
||||
".producer"
|
||||
".owner"
|
||||
"origin"
|
||||
] ->SET 'ignoredLabels' STORE
|
||||
|
||||
// Search for available snapshots
|
||||
|
|
Loading…
Reference in New Issue