Compare commits
4 Commits
7dd49ca418
...
88982fffe6
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 88982fffe6 | |
Taylor McKinnon | 67ffa99861 | |
Taylor McKinnon | 3993acade1 | |
Taylor McKinnon | 93ea97334d |
|
@ -27,7 +27,7 @@ x-models:
|
|||
|
||||
services:
|
||||
redis-0:
|
||||
image: redis:5
|
||||
image: redis:6
|
||||
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||
ports:
|
||||
- 6379:6379
|
||||
|
@ -35,7 +35,7 @@ services:
|
|||
- HOST_IP="${EXTERNAL_HOST}"
|
||||
|
||||
redis-1:
|
||||
image: redis:5
|
||||
image: redis:6
|
||||
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||
ports:
|
||||
- 6380:6380
|
||||
|
@ -43,7 +43,7 @@ services:
|
|||
- HOST_IP="${EXTERNAL_HOST}"
|
||||
|
||||
redis-sentinel-0:
|
||||
image: redis:5
|
||||
image: redis:6
|
||||
command: |-
|
||||
bash -c 'cat > /tmp/sentinel.conf <<EOF
|
||||
port 16379
|
||||
|
|
|
@ -54,5 +54,16 @@
|
|||
"filter": {
|
||||
"allow": {},
|
||||
"deny": {}
|
||||
},
|
||||
"metrics" : {
|
||||
"enabled": false,
|
||||
"host": "localhost",
|
||||
"serverPort": 10901,
|
||||
"ingestPort": 10902,
|
||||
"checkpointPort": 10903,
|
||||
"snapshotPort": 10904,
|
||||
"diskUsagePort": 10905,
|
||||
"reindexPort": 10906,
|
||||
"repairPort": 10907
|
||||
}
|
||||
}
|
||||
|
|
|
@ -375,6 +375,18 @@ class Config {
|
|||
|
||||
parsedConfig.filter = Config._parseResourceFilters(config.filter);
|
||||
|
||||
parsedConfig.metrics = {
|
||||
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
|
||||
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
|
||||
serverPort: _loadFromEnv('METRICS_PORT_SERVER', config.metrics.serverPort, _typeCasts.int),
|
||||
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
|
||||
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
|
||||
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
|
||||
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
|
||||
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
|
||||
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
|
||||
};
|
||||
|
||||
return parsedConfig;
|
||||
}
|
||||
|
||||
|
|
|
@ -115,7 +115,17 @@ const schema = Joi.object({
|
|||
return filterObj;
|
||||
}, {},
|
||||
)),
|
||||
metrics: {
|
||||
enabled: Joi.boolean(),
|
||||
host: Joi.string(),
|
||||
serverPort: Joi.number().port(),
|
||||
ingestPort: Joi.number().port(),
|
||||
checkpointPort: Joi.number().port(),
|
||||
snapshotPort: Joi.number().port(),
|
||||
diskUsagePort: Joi.number().port(),
|
||||
reindexPort: Joi.number().port(),
|
||||
repairPort: Joi.number().port(),
|
||||
},
|
||||
});
|
||||
|
||||
module.exports = schema;
|
||||
|
||||
|
|
|
@ -96,7 +96,7 @@ async function listMetric(ctx, params) {
|
|||
|
||||
const metric = {
|
||||
...result.metrics,
|
||||
timeRange: [ start, end ],
|
||||
timeRange: [start, end],
|
||||
operations: {
|
||||
...emptyOperationsResponse,
|
||||
...operations,
|
||||
|
|
|
@ -1,10 +1,12 @@
|
|||
const assert = require('assert');
|
||||
const cron = require('node-schedule');
|
||||
const cronparser = require('cron-parser');
|
||||
const promClient = require('prom-client');
|
||||
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
|
||||
|
||||
const { client: cacheClient } = require('../cache');
|
||||
const Process = require('../process');
|
||||
const { LoggerContext, iterIfError } = require('../utils');
|
||||
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'BaseTask',
|
||||
|
@ -22,6 +24,11 @@ class BaseTask extends Process {
|
|||
this._scheduler = null;
|
||||
this._defaultSchedule = Now;
|
||||
this._defaultLag = 0;
|
||||
this._enableMetrics = options.enableMetrics || false;
|
||||
this._metricsHost = options.metricsHost || 'localhost';
|
||||
this._metricsPort = options.metricsPort || 9001;
|
||||
this._metricsHandlers = null;
|
||||
this._probeServer = null;
|
||||
}
|
||||
|
||||
async _setup(includeDefaultOpts = true) {
|
||||
|
@ -39,6 +46,70 @@ class BaseTask extends Process {
|
|||
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
||||
.option('-n, --node-id <id>', 'Set a custom node id');
|
||||
}
|
||||
|
||||
if (this._enableMetrics) {
|
||||
this._metricsHandlers = {
|
||||
...this._registerDefaultMetricHandlers(),
|
||||
...this._registerMetricHandlers(),
|
||||
};
|
||||
await this._createProbeServer();
|
||||
}
|
||||
}
|
||||
|
||||
_registerDefaultMetricHandlers() {
|
||||
const taskName = this.constructor.name;
|
||||
|
||||
// Get the name of our subclass in snake case format eg BaseClass => _base_class
|
||||
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
|
||||
|
||||
const executionDuration = new promClient.Gauge({
|
||||
name: `utapi${taskNameSnake}_duration_seconds`,
|
||||
help: `Execution time of the ${taskName} task`,
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const executionAttempts = new promClient.Counter({
|
||||
name: `utapi${taskNameSnake}_attempts_total`,
|
||||
help: `Number of attempts to execute the ${taskName} task`,
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const executionFailures = new promClient.Counter({
|
||||
name: `utapi${taskNameSnake}_failures_total`,
|
||||
help: `Number of failures executing the ${taskName} task`,
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return {
|
||||
executionDuration,
|
||||
executionAttempts,
|
||||
executionFailures,
|
||||
};
|
||||
}
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
return {};
|
||||
}
|
||||
|
||||
async _createProbeServer() {
|
||||
this._probeServer = await startProbeServer({
|
||||
host: this._metricsHost,
|
||||
port: this._metricsPort,
|
||||
});
|
||||
|
||||
this._probeServer.addHandler(
|
||||
DEFAULT_METRICS_ROUTE,
|
||||
(res, log) => {
|
||||
log.debug('metrics requested');
|
||||
res.writeHead(200, {
|
||||
'Content-Type': promClient.register.contentType,
|
||||
});
|
||||
promClient.register.metrics().then(metrics => {
|
||||
res.end(metrics);
|
||||
});
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
get schedule() {
|
||||
|
@ -79,12 +150,23 @@ class BaseTask extends Process {
|
|||
}
|
||||
|
||||
async execute() {
|
||||
let endTimer;
|
||||
if (this._enableMetrics) {
|
||||
endTimer = this._metricsHandlers.executionDuration.startTimer();
|
||||
this._metricsHandlers.executionAttempts.inc(1);
|
||||
}
|
||||
|
||||
try {
|
||||
const timestamp = new Date() * 1000; // Timestamp in microseconds;
|
||||
const laggedTimestamp = timestamp - (this.lag * 1000000);
|
||||
await this._execute(laggedTimestamp);
|
||||
} catch (error) {
|
||||
logger.error('Error during task execution', { error });
|
||||
this._metricsHandlers.executionFailures.inc(1);
|
||||
}
|
||||
|
||||
if (this._enableMetrics) {
|
||||
endTimer();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,6 +176,9 @@ class BaseTask extends Process {
|
|||
}
|
||||
|
||||
async _join() {
|
||||
if (this._probeServer !== null) {
|
||||
this._probeServer.stop();
|
||||
}
|
||||
return this._cache.disconnect();
|
||||
}
|
||||
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const promClient = require('prom-client');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
||||
|
@ -9,11 +10,30 @@ const logger = new LoggerContext({
|
|||
|
||||
class CreateCheckpoint extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
super({
|
||||
...options,
|
||||
enableMetrics: config.metrics.enabled,
|
||||
metricsHost: config.metrics.host,
|
||||
metricsPort: config.metrics.checkpointPort,
|
||||
});
|
||||
|
||||
this._defaultSchedule = config.checkpointSchedule;
|
||||
this._defaultLag = checkpointLagSecs;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const created = new promClient.Counter({
|
||||
name: 'utapi_create_checkpoint_created_total',
|
||||
help: 'Number of checkpoints created',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return {
|
||||
created,
|
||||
};
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
||||
const status = await this.withWarp10(async warp10 => {
|
||||
|
@ -29,6 +49,7 @@ class CreateCheckpoint extends BaseTask {
|
|||
});
|
||||
if (status.result[0]) {
|
||||
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
||||
this._metricsHandlers.created.inc(status.result[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const promClient = require('prom-client');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { snapshotLagSecs } = require('../constants');
|
||||
|
@ -9,11 +10,30 @@ const logger = new LoggerContext({
|
|||
|
||||
class CreateSnapshot extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
super({
|
||||
...options,
|
||||
enableMetrics: config.metrics.enabled,
|
||||
metricsHost: config.metrics.host,
|
||||
metricsPort: config.metrics.snapshotPort,
|
||||
});
|
||||
|
||||
this._defaultSchedule = config.snapshotSchedule;
|
||||
this._defaultLag = snapshotLagSecs;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const created = new promClient.Counter({
|
||||
name: 'utapi_create_snapshot_created_total',
|
||||
help: 'Number of snapshots created',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return {
|
||||
created,
|
||||
};
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
|
||||
|
||||
|
@ -29,6 +49,7 @@ class CreateSnapshot extends BaseTask {
|
|||
});
|
||||
if (status.result[0]) {
|
||||
logger.info(`created ${status.result[0]} snapshots`);
|
||||
this._metricsHandlers.created.inc(status.result[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,6 @@
|
|||
const async = require('async');
|
||||
const Path = require('path');
|
||||
const promClient = require('prom-client');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { expirationChunkDuration } = require('../constants');
|
||||
|
@ -16,9 +18,13 @@ const ACTION_THRESHOLD = 0.95;
|
|||
|
||||
class MonitorDiskUsage extends BaseTask {
|
||||
constructor(options) {
|
||||
super(
|
||||
options,
|
||||
);
|
||||
super({
|
||||
...options,
|
||||
enableMetrics: config.metrics.enabled,
|
||||
metricsHost: config.metrics.host,
|
||||
metricsPort: config.metrics.diskUsagePort,
|
||||
});
|
||||
|
||||
this._defaultSchedule = config.diskUsageSchedule;
|
||||
this._defaultLag = 0;
|
||||
this._path = config.diskUsage.path;
|
||||
|
@ -42,6 +48,39 @@ class MonitorDiskUsage extends BaseTask {
|
|||
);
|
||||
}
|
||||
|
||||
_registerMetricHandlers() {
|
||||
const isLocked = new promClient.Gauge({
|
||||
name: 'utapi_monitor_disk_usage_is_locked',
|
||||
help: 'Indicates whether the monitored warp 10 has had writes disabled',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const leveldbBytes = new promClient.Gauge({
|
||||
name: 'utapi_monitor_disk_usage_leveldb_bytes',
|
||||
help: 'Total bytes used by warp 10',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const datalogBytes = new promClient.Gauge({
|
||||
name: 'utapi_monitor_disk_usage_datalog_bytes',
|
||||
help: 'Total bytes used by warp 10',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const hardLimitRatio = new promClient.Gauge({
|
||||
name: 'utapi_monitor_disk_usage_hard_limit_ratio',
|
||||
help: 'Percent of the hard limit used by warp 10',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return {
|
||||
isLocked,
|
||||
leveldbBytes,
|
||||
datalogBytes,
|
||||
hardLimitRatio,
|
||||
};
|
||||
}
|
||||
|
||||
get isLeader() {
|
||||
return this._program.leader !== undefined;
|
||||
}
|
||||
|
@ -54,9 +93,9 @@ class MonitorDiskUsage extends BaseTask {
|
|||
return this._program.lock !== undefined;
|
||||
}
|
||||
|
||||
_getUsage() {
|
||||
moduleLogger.debug(`calculating disk usage for ${this._path}`);
|
||||
return getFolderSize(this._path);
|
||||
_getUsage(path) {
|
||||
moduleLogger.debug(`calculating disk usage for ${path}`);
|
||||
return getFolderSize(path);
|
||||
}
|
||||
|
||||
async _expireMetrics(timestamp) {
|
||||
|
@ -113,6 +152,8 @@ class MonitorDiskUsage extends BaseTask {
|
|||
nodeId,
|
||||
});
|
||||
|
||||
this._metricsHandlers.hardLimitRatio.set(hardPercentage)
|
||||
|
||||
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
|
||||
|
||||
if (hardPercentage < WARN_THRESHOLD) {
|
||||
|
@ -150,12 +191,14 @@ class MonitorDiskUsage extends BaseTask {
|
|||
if (this.isManualUnlock) {
|
||||
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
|
||||
await this._enableWarp10Updates();
|
||||
this._metricsHandlers.isLocked.set(0)
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isManualLock) {
|
||||
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
|
||||
await this._disableWarp10Updates();
|
||||
this._metricsHandlers.isLocked.set(1)
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -170,14 +213,20 @@ class MonitorDiskUsage extends BaseTask {
|
|||
return;
|
||||
}
|
||||
|
||||
let size = null;
|
||||
let leveldbBytes = null;
|
||||
let datalogBytes = null
|
||||
try {
|
||||
size = await this._getUsage();
|
||||
leveldbBytes = await this._getUsage(Path.join(this._path, 'data', 'leveldb'));
|
||||
datalogBytes = await this._getUsage(Path.join(this._path, 'data', 'datalog'));
|
||||
} catch (error) {
|
||||
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
|
||||
return;
|
||||
}
|
||||
|
||||
this._metricsHandlers.leveldbBytes.set(leveldbBytes);
|
||||
this._metricsHandlers.datalogBytes.set(datalogBytes);
|
||||
|
||||
const size = leveldbBytes + datalogBytes;
|
||||
if (this._hardLimit !== null) {
|
||||
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
|
||||
|
||||
|
@ -185,10 +234,12 @@ class MonitorDiskUsage extends BaseTask {
|
|||
if (shouldLock) {
|
||||
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
|
||||
await this._disableWarp10Updates();
|
||||
this._metricsHandlers.isLocked.set(1)
|
||||
} else {
|
||||
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
|
||||
{ nodeId: this.nodeId });
|
||||
await this._enableWarp10Updates();
|
||||
this._metricsHandlers.isLocked.set(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const promClient = require('prom-client');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const { UtapiMetric } = require('../models');
|
||||
const config = require('../config');
|
||||
|
@ -16,12 +17,38 @@ const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
|||
|
||||
class IngestShardTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
super({
|
||||
...options,
|
||||
enableMetrics: config.metrics.enabled,
|
||||
metricsHost: config.metrics.host,
|
||||
metricsPort: config.metrics.ingestPort,
|
||||
});
|
||||
|
||||
this._defaultSchedule = config.ingestionSchedule;
|
||||
this._defaultLag = config.ingestionLagSeconds;
|
||||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const ingestedTotal = new promClient.Counter({
|
||||
name: 'utapi_ingest_shard_task_ingest_total',
|
||||
help: 'Number of metrics ingested',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const ingestedSlow = new promClient.Counter({
|
||||
name: 'utapi_ingest_shard_task_slow_total',
|
||||
help: 'Number of slow metrics ingested',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return {
|
||||
ingestedTotal,
|
||||
ingestedSlow,
|
||||
};
|
||||
}
|
||||
|
||||
_hydrateEvent(data, stripTimestamp = false) {
|
||||
const event = JSON.parse(data);
|
||||
if (this._stripEventUUID) {
|
||||
|
@ -84,6 +111,11 @@ class IngestShardTask extends BaseTask {
|
|||
assert.strictEqual(status, records.length);
|
||||
await this._cache.deleteShard(shard);
|
||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||
|
||||
this._metricsHandlers.ingestedTotal.inc(records.length);
|
||||
if (areSlowEvents) {
|
||||
this._metricsHandlers.ingestedSlow.inc(records.length);
|
||||
}
|
||||
} else {
|
||||
logger.debug('No events found in shard, cleaning up');
|
||||
}
|
||||
|
|
|
@ -20,7 +20,13 @@ const logger = new LoggerContext({
|
|||
|
||||
class ReindexTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
super({
|
||||
...options,
|
||||
enableMetrics: config.metrics.enabled,
|
||||
metricsHost: config.metrics.host,
|
||||
metricsPort: config.metrics.reindexPort,
|
||||
});
|
||||
|
||||
this._defaultSchedule = config.reindexSchedule;
|
||||
this._defaultLag = 0;
|
||||
const eventFilters = (config && config.filter) || {};
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
const promClient = require('prom-client');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const config = require('../config');
|
||||
const { LoggerContext } = require('../utils');
|
||||
|
@ -9,11 +10,30 @@ const logger = new LoggerContext({
|
|||
|
||||
class RepairTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
super({
|
||||
...options,
|
||||
enableMetrics: config.metrics.enabled,
|
||||
metricsHost: config.metrics.host,
|
||||
metricsPort: config.metrics.repairPort,
|
||||
});
|
||||
|
||||
this._defaultSchedule = config.repairSchedule;
|
||||
this._defaultLag = repairLagSecs;
|
||||
}
|
||||
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const created = new promClient.Counter({
|
||||
name: 'utapi_repair_task_created_total',
|
||||
help: 'Number of repair records created',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return {
|
||||
created,
|
||||
};
|
||||
}
|
||||
|
||||
async _execute(timestamp) {
|
||||
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
||||
|
||||
|
@ -30,6 +50,7 @@ class RepairTask extends BaseTask {
|
|||
});
|
||||
if (status.result[0]) {
|
||||
logger.info(`created ${status.result[0]} corrections`);
|
||||
this._metricsHandlers.created.inc(status.result[0]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ const timestamp = require('./timestamp');
|
|||
const func = require('./func');
|
||||
const disk = require('./disk');
|
||||
const filter = require('./filter');
|
||||
const probe = require('./probe');
|
||||
|
||||
module.exports = {
|
||||
...log,
|
||||
|
@ -12,4 +13,5 @@ module.exports = {
|
|||
...func,
|
||||
...disk,
|
||||
...filter,
|
||||
...probe,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
|
||||
const { ProbeServer } = require('arsenal').network.probe.ProbeServer;
|
||||
|
||||
/**
|
||||
* Configure probe servers
|
||||
* @typedef {Object} ProbeServerConfig
|
||||
* @property {string} bindAddress - Address to bind probe server to
|
||||
* @property {number} port - Port to bind probe server to
|
||||
*/
|
||||
|
||||
/**
|
||||
* Start an empty probe server
|
||||
* @async
|
||||
* @param {ProbeServerConfig} config - Configuration for probe server
|
||||
* @returns {Promise<ProbeServer>} - Instance of ProbeServer
|
||||
*/
|
||||
async function startProbeServer(config) {
|
||||
if (!config) {
|
||||
throw new Error('configuration for probe server is missing');
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const probeServer = new ProbeServer(config);
|
||||
probeServer.onListening(() => resolve(probeServer));
|
||||
probeServer.onError(err => reject(err));
|
||||
probeServer.start();
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
startProbeServer,
|
||||
};
|
|
@ -0,0 +1,83 @@
|
|||
const assert = require('assert');
|
||||
const needle = require('needle');
|
||||
const promClient = require('prom-client');
|
||||
const sinon = require('sinon');
|
||||
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
|
||||
|
||||
const { BaseTask } = require('../../../../libV2/tasks');
|
||||
const { clients: warp10Clients } = require('../../../../libV2/warp10');
|
||||
|
||||
const { getMetricValues } = require('../../../utils/prom');
|
||||
|
||||
const METRICS_SERVER_PORT = 10999;
|
||||
|
||||
class CustomTask extends BaseTask {
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const foo = new promClient.Gauge({
|
||||
name: 'utapi_custom_task_foo_total',
|
||||
help: 'Count of foos',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
return { foo };
|
||||
}
|
||||
|
||||
async _execute() {
|
||||
this._metricsHandlers.foo.inc(1);
|
||||
}
|
||||
}
|
||||
|
||||
describe('Test BaseTask metrics', () => {
|
||||
let task;
|
||||
|
||||
beforeEach(async () => {
|
||||
task = new CustomTask({
|
||||
enableMetrics: true,
|
||||
metricsPort: METRICS_SERVER_PORT,
|
||||
warp10: [warp10Clients[0]],
|
||||
});
|
||||
await task.setup();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
task.join();
|
||||
promClient.register.clear();
|
||||
});
|
||||
|
||||
it('should start a metrics server on the provided port', async () => {
|
||||
const res = await needle(
|
||||
'get',
|
||||
`http://localhost:${METRICS_SERVER_PORT}${DEFAULT_METRICS_ROUTE}`,
|
||||
);
|
||||
const lines = res.body.split('\n');
|
||||
const first = lines[0];
|
||||
assert.strictEqual(res.statusCode, 200);
|
||||
assert(first.startsWith('# HELP'));
|
||||
});
|
||||
|
||||
it('should push metrics for a task execution', async () => {
|
||||
await task.execute();
|
||||
const timeValues = await getMetricValues('utapi_custom_task_execution_seconds');
|
||||
assert.strictEqual(timeValues.length, 1);
|
||||
|
||||
const attemptsValues = await getMetricValues('utapi_custom_task_attempts_total');
|
||||
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
|
||||
|
||||
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
||||
assert.deepStrictEqual(failuresValues, []);
|
||||
});
|
||||
|
||||
it('should push metrics for a failed task execution', async () => {
|
||||
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
|
||||
await task.execute();
|
||||
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
||||
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
|
||||
});
|
||||
|
||||
it('should allow custom handlers to be registered', async () => {
|
||||
await task.execute();
|
||||
const fooValues = await getMetricValues('utapi_custom_task_foo_total');
|
||||
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,11 @@
|
|||
const promClient = require('prom-client');
|
||||
|
||||
async function getMetricValues(name) {
|
||||
const metric = await promClient.register.getSingleMetric(name);
|
||||
const data = await metric.get();
|
||||
return data.values;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
getMetricValues,
|
||||
};
|
Loading…
Reference in New Issue