Compare commits

...

4 Commits

Author SHA1 Message Date
Taylor McKinnon 88982fffe6 utapi changes to break out warp10 2022-05-05 10:34:22 -07:00
Taylor McKinnon 67ffa99861 ft(UTAPI-69): Add metrics for async tasks 2022-05-05 10:26:35 -07:00
Taylor McKinnon 3993acade1 metric name 2022-05-05 09:48:14 -07:00
Taylor McKinnon 93ea97334d ft(UTAPI-67): Add metrics framework to BaseTask 2022-05-04 11:43:49 -07:00
16 changed files with 421 additions and 23 deletions

View File

@ -27,7 +27,7 @@ x-models:
services:
redis-0:
image: redis:5
image: redis:6
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6379:6379
@ -35,7 +35,7 @@ services:
- HOST_IP="${EXTERNAL_HOST}"
redis-1:
image: redis:5
image: redis:6
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6380:6380
@ -43,7 +43,7 @@ services:
- HOST_IP="${EXTERNAL_HOST}"
redis-sentinel-0:
image: redis:5
image: redis:6
command: |-
bash -c 'cat > /tmp/sentinel.conf <<EOF
port 16379

View File

@ -54,5 +54,16 @@
"filter": {
"allow": {},
"deny": {}
},
"metrics" : {
"enabled": false,
"host": "localhost",
"serverPort": 10901,
"ingestPort": 10902,
"checkpointPort": 10903,
"snapshotPort": 10904,
"diskUsagePort": 10905,
"reindexPort": 10906,
"repairPort": 10907
}
}

View File

@ -375,6 +375,18 @@ class Config {
parsedConfig.filter = Config._parseResourceFilters(config.filter);
parsedConfig.metrics = {
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
serverPort: _loadFromEnv('METRICS_PORT_SERVER', config.metrics.serverPort, _typeCasts.int),
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
};
return parsedConfig;
}

View File

@ -115,7 +115,17 @@ const schema = Joi.object({
return filterObj;
}, {},
)),
metrics: {
enabled: Joi.boolean(),
host: Joi.string(),
serverPort: Joi.number().port(),
ingestPort: Joi.number().port(),
checkpointPort: Joi.number().port(),
snapshotPort: Joi.number().port(),
diskUsagePort: Joi.number().port(),
reindexPort: Joi.number().port(),
repairPort: Joi.number().port(),
},
});
module.exports = schema;

View File

@ -1,10 +1,12 @@
const assert = require('assert');
const cron = require('node-schedule');
const cronparser = require('cron-parser');
const promClient = require('prom-client');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { client: cacheClient } = require('../cache');
const Process = require('../process');
const { LoggerContext, iterIfError } = require('../utils');
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
const logger = new LoggerContext({
module: 'BaseTask',
@ -22,6 +24,11 @@ class BaseTask extends Process {
this._scheduler = null;
this._defaultSchedule = Now;
this._defaultLag = 0;
this._enableMetrics = options.enableMetrics || false;
this._metricsHost = options.metricsHost || 'localhost';
this._metricsPort = options.metricsPort || 9001;
this._metricsHandlers = null;
this._probeServer = null;
}
async _setup(includeDefaultOpts = true) {
@ -39,6 +46,70 @@ class BaseTask extends Process {
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
.option('-n, --node-id <id>', 'Set a custom node id');
}
if (this._enableMetrics) {
this._metricsHandlers = {
...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(),
};
await this._createProbeServer();
}
}
_registerDefaultMetricHandlers() {
const taskName = this.constructor.name;
// Get the name of our subclass in snake case format eg BaseClass => _base_class
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
const executionDuration = new promClient.Gauge({
name: `utapi${taskNameSnake}_duration_seconds`,
help: `Execution time of the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionAttempts = new promClient.Counter({
name: `utapi${taskNameSnake}_attempts_total`,
help: `Number of attempts to execute the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionFailures = new promClient.Counter({
name: `utapi${taskNameSnake}_failures_total`,
help: `Number of failures executing the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
return {
executionDuration,
executionAttempts,
executionFailures,
};
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
return {};
}
async _createProbeServer() {
this._probeServer = await startProbeServer({
host: this._metricsHost,
port: this._metricsPort,
});
this._probeServer.addHandler(
DEFAULT_METRICS_ROUTE,
(res, log) => {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': promClient.register.contentType,
});
promClient.register.metrics().then(metrics => {
res.end(metrics);
});
},
);
}
get schedule() {
@ -79,12 +150,23 @@ class BaseTask extends Process {
}
async execute() {
let endTimer;
if (this._enableMetrics) {
endTimer = this._metricsHandlers.executionDuration.startTimer();
this._metricsHandlers.executionAttempts.inc(1);
}
try {
const timestamp = new Date() * 1000; // Timestamp in microseconds;
const laggedTimestamp = timestamp - (this.lag * 1000000);
await this._execute(laggedTimestamp);
} catch (error) {
logger.error('Error during task execution', { error });
this._metricsHandlers.executionFailures.inc(1);
}
if (this._enableMetrics) {
endTimer();
}
}
@ -94,6 +176,9 @@ class BaseTask extends Process {
}
async _join() {
if (this._probeServer !== null) {
this._probeServer.stop();
}
return this._cache.disconnect();
}

View File

@ -1,3 +1,4 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants');
@ -9,11 +10,30 @@ const logger = new LoggerContext({
class CreateCheckpoint extends BaseTask {
constructor(options) {
super(options);
super({
...options,
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.checkpointPort,
});
this._defaultSchedule = config.checkpointSchedule;
this._defaultLag = checkpointLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 'utapi_create_checkpoint_created_total',
help: 'Number of checkpoints created',
labelNames: ['origin', 'containerName'],
});
return {
created,
};
}
async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {
@ -29,6 +49,7 @@ class CreateCheckpoint extends BaseTask {
});
if (status.result[0]) {
logger.info(`created ${status.result[0] || 0} checkpoints`);
this._metricsHandlers.created.inc(status.result[0]);
}
}
}

View File

@ -1,3 +1,4 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { snapshotLagSecs } = require('../constants');
@ -9,11 +10,30 @@ const logger = new LoggerContext({
class CreateSnapshot extends BaseTask {
constructor(options) {
super(options);
super({
...options,
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.snapshotPort,
});
this._defaultSchedule = config.snapshotSchedule;
this._defaultLag = snapshotLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 'utapi_create_snapshot_created_total',
help: 'Number of snapshots created',
labelNames: ['origin', 'containerName'],
});
return {
created,
};
}
async _execute(timestamp) {
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
@ -29,6 +49,7 @@ class CreateSnapshot extends BaseTask {
});
if (status.result[0]) {
logger.info(`created ${status.result[0]} snapshots`);
this._metricsHandlers.created.inc(status.result[0]);
}
}
}

View File

@ -1,4 +1,6 @@
const async = require('async');
const Path = require('path');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { expirationChunkDuration } = require('../constants');
@ -16,9 +18,13 @@ const ACTION_THRESHOLD = 0.95;
class MonitorDiskUsage extends BaseTask {
constructor(options) {
super(
options,
);
super({
...options,
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.diskUsagePort,
});
this._defaultSchedule = config.diskUsageSchedule;
this._defaultLag = 0;
this._path = config.diskUsage.path;
@ -42,6 +48,39 @@ class MonitorDiskUsage extends BaseTask {
);
}
_registerMetricHandlers() {
const isLocked = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_is_locked',
help: 'Indicates whether the monitored warp 10 has had writes disabled',
labelNames: ['origin', 'containerName'],
});
const leveldbBytes = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_leveldb_bytes',
help: 'Total bytes used by warp 10',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10',
labelNames: ['origin', 'containerName'],
});
const hardLimitRatio = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_hard_limit_ratio',
help: 'Percent of the hard limit used by warp 10',
labelNames: ['origin', 'containerName'],
});
return {
isLocked,
leveldbBytes,
datalogBytes,
hardLimitRatio,
};
}
get isLeader() {
return this._program.leader !== undefined;
}
@ -54,9 +93,9 @@ class MonitorDiskUsage extends BaseTask {
return this._program.lock !== undefined;
}
_getUsage() {
moduleLogger.debug(`calculating disk usage for ${this._path}`);
return getFolderSize(this._path);
_getUsage(path) {
moduleLogger.debug(`calculating disk usage for ${path}`);
return getFolderSize(path);
}
async _expireMetrics(timestamp) {
@ -113,6 +152,8 @@ class MonitorDiskUsage extends BaseTask {
nodeId,
});
this._metricsHandlers.hardLimitRatio.set(hardPercentage)
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
if (hardPercentage < WARN_THRESHOLD) {
@ -150,12 +191,14 @@ class MonitorDiskUsage extends BaseTask {
if (this.isManualUnlock) {
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
await this._enableWarp10Updates();
this._metricsHandlers.isLocked.set(0)
return;
}
if (this.isManualLock) {
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
await this._disableWarp10Updates();
this._metricsHandlers.isLocked.set(1)
return;
}
@ -170,14 +213,20 @@ class MonitorDiskUsage extends BaseTask {
return;
}
let size = null;
let leveldbBytes = null;
let datalogBytes = null
try {
size = await this._getUsage();
leveldbBytes = await this._getUsage(Path.join(this._path, 'data', 'leveldb'));
datalogBytes = await this._getUsage(Path.join(this._path, 'data', 'datalog'));
} catch (error) {
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
return;
}
this._metricsHandlers.leveldbBytes.set(leveldbBytes);
this._metricsHandlers.datalogBytes.set(datalogBytes);
const size = leveldbBytes + datalogBytes;
if (this._hardLimit !== null) {
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
@ -185,10 +234,12 @@ class MonitorDiskUsage extends BaseTask {
if (shouldLock) {
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
await this._disableWarp10Updates();
this._metricsHandlers.isLocked.set(1)
} else {
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
{ nodeId: this.nodeId });
await this._enableWarp10Updates();
this._metricsHandlers.isLocked.set(0)
}
}
}

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const async = require('async');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const config = require('../config');
@ -16,12 +17,38 @@ const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
class IngestShardTask extends BaseTask {
constructor(options) {
super(options);
super({
...options,
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.ingestPort,
});
this._defaultSchedule = config.ingestionSchedule;
this._defaultLag = config.ingestionLagSeconds;
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const ingestedTotal = new promClient.Counter({
name: 'utapi_ingest_shard_task_ingest_total',
help: 'Number of metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedSlow = new promClient.Counter({
name: 'utapi_ingest_shard_task_slow_total',
help: 'Number of slow metrics ingested',
labelNames: ['origin', 'containerName'],
});
return {
ingestedTotal,
ingestedSlow,
};
}
_hydrateEvent(data, stripTimestamp = false) {
const event = JSON.parse(data);
if (this._stripEventUUID) {
@ -84,6 +111,11 @@ class IngestShardTask extends BaseTask {
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
this._metricsHandlers.ingestedTotal.inc(records.length);
if (areSlowEvents) {
this._metricsHandlers.ingestedSlow.inc(records.length);
}
} else {
logger.debug('No events found in shard, cleaning up');
}

View File

@ -20,7 +20,13 @@ const logger = new LoggerContext({
class ReindexTask extends BaseTask {
constructor(options) {
super(options);
super({
...options,
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.reindexPort,
});
this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0;
const eventFilters = (config && config.filter) || {};

View File

@ -1,3 +1,4 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { LoggerContext } = require('../utils');
@ -9,11 +10,30 @@ const logger = new LoggerContext({
class RepairTask extends BaseTask {
constructor(options) {
super(options);
super({
...options,
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.repairPort,
});
this._defaultSchedule = config.repairSchedule;
this._defaultLag = repairLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 'utapi_repair_task_created_total',
help: 'Number of repair records created',
labelNames: ['origin', 'containerName'],
});
return {
created,
};
}
async _execute(timestamp) {
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
@ -30,6 +50,7 @@ class RepairTask extends BaseTask {
});
if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`);
this._metricsHandlers.created.inc(status.result[0]);
}
}
}

View File

@ -4,6 +4,7 @@ const timestamp = require('./timestamp');
const func = require('./func');
const disk = require('./disk');
const filter = require('./filter');
const probe = require('./probe');
module.exports = {
...log,
@ -12,4 +13,5 @@ module.exports = {
...func,
...disk,
...filter,
...probe,
};

32
libV2/utils/probe.js Normal file
View File

@ -0,0 +1,32 @@
const { ProbeServer } = require('arsenal').network.probe.ProbeServer;
/**
* Configure probe servers
* @typedef {Object} ProbeServerConfig
* @property {string} bindAddress - Address to bind probe server to
* @property {number} port - Port to bind probe server to
*/
/**
* Start an empty probe server
* @async
* @param {ProbeServerConfig} config - Configuration for probe server
* @returns {Promise<ProbeServer>} - Instance of ProbeServer
*/
async function startProbeServer(config) {
if (!config) {
throw new Error('configuration for probe server is missing');
}
return new Promise((resolve, reject) => {
const probeServer = new ProbeServer(config);
probeServer.onListening(() => resolve(probeServer));
probeServer.onError(err => reject(err));
probeServer.start();
});
}
module.exports = {
startProbeServer,
};

View File

@ -0,0 +1,83 @@
const assert = require('assert');
const needle = require('needle');
const promClient = require('prom-client');
const sinon = require('sinon');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { BaseTask } = require('../../../../libV2/tasks');
const { clients: warp10Clients } = require('../../../../libV2/warp10');
const { getMetricValues } = require('../../../utils/prom');
const METRICS_SERVER_PORT = 10999;
class CustomTask extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const foo = new promClient.Gauge({
name: 'utapi_custom_task_foo_total',
help: 'Count of foos',
labelNames: ['origin', 'containerName'],
});
return { foo };
}
async _execute() {
this._metricsHandlers.foo.inc(1);
}
}
describe('Test BaseTask metrics', () => {
let task;
beforeEach(async () => {
task = new CustomTask({
enableMetrics: true,
metricsPort: METRICS_SERVER_PORT,
warp10: [warp10Clients[0]],
});
await task.setup();
});
afterEach(async () => {
task.join();
promClient.register.clear();
});
it('should start a metrics server on the provided port', async () => {
const res = await needle(
'get',
`http://localhost:${METRICS_SERVER_PORT}${DEFAULT_METRICS_ROUTE}`,
);
const lines = res.body.split('\n');
const first = lines[0];
assert.strictEqual(res.statusCode, 200);
assert(first.startsWith('# HELP'));
});
it('should push metrics for a task execution', async () => {
await task.execute();
const timeValues = await getMetricValues('utapi_custom_task_execution_seconds');
assert.strictEqual(timeValues.length, 1);
const attemptsValues = await getMetricValues('utapi_custom_task_attempts_total');
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, []);
});
it('should push metrics for a failed task execution', async () => {
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
await task.execute();
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
});
it('should allow custom handlers to be registered', async () => {
await task.execute();
const fooValues = await getMetricValues('utapi_custom_task_foo_total');
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
});
});

11
tests/utils/prom.js Normal file
View File

@ -0,0 +1,11 @@
const promClient = require('prom-client');
async function getMetricValues(name) {
const metric = await promClient.register.getSingleMetric(name);
const data = await metric.get();
return data.values;
}
module.exports = {
getMetricValues,
};