Compare commits
4 Commits
7dd49ca418
...
88982fffe6
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 88982fffe6 | |
Taylor McKinnon | 67ffa99861 | |
Taylor McKinnon | 3993acade1 | |
Taylor McKinnon | 93ea97334d |
|
@ -27,7 +27,7 @@ x-models:
|
||||||
|
|
||||||
services:
|
services:
|
||||||
redis-0:
|
redis-0:
|
||||||
image: redis:5
|
image: redis:6
|
||||||
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||||
ports:
|
ports:
|
||||||
- 6379:6379
|
- 6379:6379
|
||||||
|
@ -35,7 +35,7 @@ services:
|
||||||
- HOST_IP="${EXTERNAL_HOST}"
|
- HOST_IP="${EXTERNAL_HOST}"
|
||||||
|
|
||||||
redis-1:
|
redis-1:
|
||||||
image: redis:5
|
image: redis:6
|
||||||
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||||
ports:
|
ports:
|
||||||
- 6380:6380
|
- 6380:6380
|
||||||
|
@ -43,7 +43,7 @@ services:
|
||||||
- HOST_IP="${EXTERNAL_HOST}"
|
- HOST_IP="${EXTERNAL_HOST}"
|
||||||
|
|
||||||
redis-sentinel-0:
|
redis-sentinel-0:
|
||||||
image: redis:5
|
image: redis:6
|
||||||
command: |-
|
command: |-
|
||||||
bash -c 'cat > /tmp/sentinel.conf <<EOF
|
bash -c 'cat > /tmp/sentinel.conf <<EOF
|
||||||
port 16379
|
port 16379
|
||||||
|
|
|
@ -54,5 +54,16 @@
|
||||||
"filter": {
|
"filter": {
|
||||||
"allow": {},
|
"allow": {},
|
||||||
"deny": {}
|
"deny": {}
|
||||||
|
},
|
||||||
|
"metrics" : {
|
||||||
|
"enabled": false,
|
||||||
|
"host": "localhost",
|
||||||
|
"serverPort": 10901,
|
||||||
|
"ingestPort": 10902,
|
||||||
|
"checkpointPort": 10903,
|
||||||
|
"snapshotPort": 10904,
|
||||||
|
"diskUsagePort": 10905,
|
||||||
|
"reindexPort": 10906,
|
||||||
|
"repairPort": 10907
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -375,6 +375,18 @@ class Config {
|
||||||
|
|
||||||
parsedConfig.filter = Config._parseResourceFilters(config.filter);
|
parsedConfig.filter = Config._parseResourceFilters(config.filter);
|
||||||
|
|
||||||
|
parsedConfig.metrics = {
|
||||||
|
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
|
||||||
|
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
|
||||||
|
serverPort: _loadFromEnv('METRICS_PORT_SERVER', config.metrics.serverPort, _typeCasts.int),
|
||||||
|
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
|
||||||
|
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
|
||||||
|
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
|
||||||
|
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
|
||||||
|
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
|
||||||
|
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
|
||||||
|
};
|
||||||
|
|
||||||
return parsedConfig;
|
return parsedConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,7 +115,17 @@ const schema = Joi.object({
|
||||||
return filterObj;
|
return filterObj;
|
||||||
}, {},
|
}, {},
|
||||||
)),
|
)),
|
||||||
|
metrics: {
|
||||||
|
enabled: Joi.boolean(),
|
||||||
|
host: Joi.string(),
|
||||||
|
serverPort: Joi.number().port(),
|
||||||
|
ingestPort: Joi.number().port(),
|
||||||
|
checkpointPort: Joi.number().port(),
|
||||||
|
snapshotPort: Joi.number().port(),
|
||||||
|
diskUsagePort: Joi.number().port(),
|
||||||
|
reindexPort: Joi.number().port(),
|
||||||
|
repairPort: Joi.number().port(),
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
module.exports = schema;
|
module.exports = schema;
|
||||||
|
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const cron = require('node-schedule');
|
const cron = require('node-schedule');
|
||||||
const cronparser = require('cron-parser');
|
const cronparser = require('cron-parser');
|
||||||
|
const promClient = require('prom-client');
|
||||||
|
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
|
||||||
|
|
||||||
const { client: cacheClient } = require('../cache');
|
const { client: cacheClient } = require('../cache');
|
||||||
const Process = require('../process');
|
const Process = require('../process');
|
||||||
const { LoggerContext, iterIfError } = require('../utils');
|
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
const logger = new LoggerContext({
|
||||||
module: 'BaseTask',
|
module: 'BaseTask',
|
||||||
|
@ -22,6 +24,11 @@ class BaseTask extends Process {
|
||||||
this._scheduler = null;
|
this._scheduler = null;
|
||||||
this._defaultSchedule = Now;
|
this._defaultSchedule = Now;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
|
this._enableMetrics = options.enableMetrics || false;
|
||||||
|
this._metricsHost = options.metricsHost || 'localhost';
|
||||||
|
this._metricsPort = options.metricsPort || 9001;
|
||||||
|
this._metricsHandlers = null;
|
||||||
|
this._probeServer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
async _setup(includeDefaultOpts = true) {
|
async _setup(includeDefaultOpts = true) {
|
||||||
|
@ -39,6 +46,70 @@ class BaseTask extends Process {
|
||||||
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
||||||
.option('-n, --node-id <id>', 'Set a custom node id');
|
.option('-n, --node-id <id>', 'Set a custom node id');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (this._enableMetrics) {
|
||||||
|
this._metricsHandlers = {
|
||||||
|
...this._registerDefaultMetricHandlers(),
|
||||||
|
...this._registerMetricHandlers(),
|
||||||
|
};
|
||||||
|
await this._createProbeServer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_registerDefaultMetricHandlers() {
|
||||||
|
const taskName = this.constructor.name;
|
||||||
|
|
||||||
|
// Get the name of our subclass in snake case format eg BaseClass => _base_class
|
||||||
|
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
|
||||||
|
|
||||||
|
const executionDuration = new promClient.Gauge({
|
||||||
|
name: `utapi${taskNameSnake}_duration_seconds`,
|
||||||
|
help: `Execution time of the ${taskName} task`,
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const executionAttempts = new promClient.Counter({
|
||||||
|
name: `utapi${taskNameSnake}_attempts_total`,
|
||||||
|
help: `Number of attempts to execute the ${taskName} task`,
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const executionFailures = new promClient.Counter({
|
||||||
|
name: `utapi${taskNameSnake}_failures_total`,
|
||||||
|
help: `Number of failures executing the ${taskName} task`,
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
executionDuration,
|
||||||
|
executionAttempts,
|
||||||
|
executionFailures,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line class-methods-use-this
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
async _createProbeServer() {
|
||||||
|
this._probeServer = await startProbeServer({
|
||||||
|
host: this._metricsHost,
|
||||||
|
port: this._metricsPort,
|
||||||
|
});
|
||||||
|
|
||||||
|
this._probeServer.addHandler(
|
||||||
|
DEFAULT_METRICS_ROUTE,
|
||||||
|
(res, log) => {
|
||||||
|
log.debug('metrics requested');
|
||||||
|
res.writeHead(200, {
|
||||||
|
'Content-Type': promClient.register.contentType,
|
||||||
|
});
|
||||||
|
promClient.register.metrics().then(metrics => {
|
||||||
|
res.end(metrics);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
get schedule() {
|
get schedule() {
|
||||||
|
@ -79,12 +150,23 @@ class BaseTask extends Process {
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute() {
|
async execute() {
|
||||||
|
let endTimer;
|
||||||
|
if (this._enableMetrics) {
|
||||||
|
endTimer = this._metricsHandlers.executionDuration.startTimer();
|
||||||
|
this._metricsHandlers.executionAttempts.inc(1);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const timestamp = new Date() * 1000; // Timestamp in microseconds;
|
const timestamp = new Date() * 1000; // Timestamp in microseconds;
|
||||||
const laggedTimestamp = timestamp - (this.lag * 1000000);
|
const laggedTimestamp = timestamp - (this.lag * 1000000);
|
||||||
await this._execute(laggedTimestamp);
|
await this._execute(laggedTimestamp);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Error during task execution', { error });
|
logger.error('Error during task execution', { error });
|
||||||
|
this._metricsHandlers.executionFailures.inc(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this._enableMetrics) {
|
||||||
|
endTimer();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -94,6 +176,9 @@ class BaseTask extends Process {
|
||||||
}
|
}
|
||||||
|
|
||||||
async _join() {
|
async _join() {
|
||||||
|
if (this._probeServer !== null) {
|
||||||
|
this._probeServer.stop();
|
||||||
|
}
|
||||||
return this._cache.disconnect();
|
return this._cache.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
const promClient = require('prom-client');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
||||||
|
@ -9,11 +10,30 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class CreateCheckpoint extends BaseTask {
|
class CreateCheckpoint extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super({
|
||||||
|
...options,
|
||||||
|
enableMetrics: config.metrics.enabled,
|
||||||
|
metricsHost: config.metrics.host,
|
||||||
|
metricsPort: config.metrics.checkpointPort,
|
||||||
|
});
|
||||||
|
|
||||||
this._defaultSchedule = config.checkpointSchedule;
|
this._defaultSchedule = config.checkpointSchedule;
|
||||||
this._defaultLag = checkpointLagSecs;
|
this._defaultLag = checkpointLagSecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line class-methods-use-this
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
const created = new promClient.Counter({
|
||||||
|
name: 'utapi_create_checkpoint_created_total',
|
||||||
|
help: 'Number of checkpoints created',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
created,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
||||||
const status = await this.withWarp10(async warp10 => {
|
const status = await this.withWarp10(async warp10 => {
|
||||||
|
@ -29,6 +49,7 @@ class CreateCheckpoint extends BaseTask {
|
||||||
});
|
});
|
||||||
if (status.result[0]) {
|
if (status.result[0]) {
|
||||||
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
||||||
|
this._metricsHandlers.created.inc(status.result[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
const promClient = require('prom-client');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { snapshotLagSecs } = require('../constants');
|
const { snapshotLagSecs } = require('../constants');
|
||||||
|
@ -9,11 +10,30 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class CreateSnapshot extends BaseTask {
|
class CreateSnapshot extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super({
|
||||||
|
...options,
|
||||||
|
enableMetrics: config.metrics.enabled,
|
||||||
|
metricsHost: config.metrics.host,
|
||||||
|
metricsPort: config.metrics.snapshotPort,
|
||||||
|
});
|
||||||
|
|
||||||
this._defaultSchedule = config.snapshotSchedule;
|
this._defaultSchedule = config.snapshotSchedule;
|
||||||
this._defaultLag = snapshotLagSecs;
|
this._defaultLag = snapshotLagSecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line class-methods-use-this
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
const created = new promClient.Counter({
|
||||||
|
name: 'utapi_create_snapshot_created_total',
|
||||||
|
help: 'Number of snapshots created',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
created,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
|
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
|
||||||
|
|
||||||
|
@ -29,6 +49,7 @@ class CreateSnapshot extends BaseTask {
|
||||||
});
|
});
|
||||||
if (status.result[0]) {
|
if (status.result[0]) {
|
||||||
logger.info(`created ${status.result[0]} snapshots`);
|
logger.info(`created ${status.result[0]} snapshots`);
|
||||||
|
this._metricsHandlers.created.inc(status.result[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,6 @@
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
const Path = require('path');
|
||||||
|
const promClient = require('prom-client');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { expirationChunkDuration } = require('../constants');
|
const { expirationChunkDuration } = require('../constants');
|
||||||
|
@ -16,9 +18,13 @@ const ACTION_THRESHOLD = 0.95;
|
||||||
|
|
||||||
class MonitorDiskUsage extends BaseTask {
|
class MonitorDiskUsage extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(
|
super({
|
||||||
options,
|
...options,
|
||||||
);
|
enableMetrics: config.metrics.enabled,
|
||||||
|
metricsHost: config.metrics.host,
|
||||||
|
metricsPort: config.metrics.diskUsagePort,
|
||||||
|
});
|
||||||
|
|
||||||
this._defaultSchedule = config.diskUsageSchedule;
|
this._defaultSchedule = config.diskUsageSchedule;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
this._path = config.diskUsage.path;
|
this._path = config.diskUsage.path;
|
||||||
|
@ -42,6 +48,39 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
const isLocked = new promClient.Gauge({
|
||||||
|
name: 'utapi_monitor_disk_usage_is_locked',
|
||||||
|
help: 'Indicates whether the monitored warp 10 has had writes disabled',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const leveldbBytes = new promClient.Gauge({
|
||||||
|
name: 'utapi_monitor_disk_usage_leveldb_bytes',
|
||||||
|
help: 'Total bytes used by warp 10',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const datalogBytes = new promClient.Gauge({
|
||||||
|
name: 'utapi_monitor_disk_usage_datalog_bytes',
|
||||||
|
help: 'Total bytes used by warp 10',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const hardLimitRatio = new promClient.Gauge({
|
||||||
|
name: 'utapi_monitor_disk_usage_hard_limit_ratio',
|
||||||
|
help: 'Percent of the hard limit used by warp 10',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
isLocked,
|
||||||
|
leveldbBytes,
|
||||||
|
datalogBytes,
|
||||||
|
hardLimitRatio,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
get isLeader() {
|
get isLeader() {
|
||||||
return this._program.leader !== undefined;
|
return this._program.leader !== undefined;
|
||||||
}
|
}
|
||||||
|
@ -54,9 +93,9 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
return this._program.lock !== undefined;
|
return this._program.lock !== undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
_getUsage() {
|
_getUsage(path) {
|
||||||
moduleLogger.debug(`calculating disk usage for ${this._path}`);
|
moduleLogger.debug(`calculating disk usage for ${path}`);
|
||||||
return getFolderSize(this._path);
|
return getFolderSize(path);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _expireMetrics(timestamp) {
|
async _expireMetrics(timestamp) {
|
||||||
|
@ -113,6 +152,8 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
nodeId,
|
nodeId,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
this._metricsHandlers.hardLimitRatio.set(hardPercentage)
|
||||||
|
|
||||||
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
|
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
|
||||||
|
|
||||||
if (hardPercentage < WARN_THRESHOLD) {
|
if (hardPercentage < WARN_THRESHOLD) {
|
||||||
|
@ -150,12 +191,14 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
if (this.isManualUnlock) {
|
if (this.isManualUnlock) {
|
||||||
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
|
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
|
||||||
await this._enableWarp10Updates();
|
await this._enableWarp10Updates();
|
||||||
|
this._metricsHandlers.isLocked.set(0)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.isManualLock) {
|
if (this.isManualLock) {
|
||||||
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
|
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
|
||||||
await this._disableWarp10Updates();
|
await this._disableWarp10Updates();
|
||||||
|
this._metricsHandlers.isLocked.set(1)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,14 +213,20 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let size = null;
|
let leveldbBytes = null;
|
||||||
|
let datalogBytes = null
|
||||||
try {
|
try {
|
||||||
size = await this._getUsage();
|
leveldbBytes = await this._getUsage(Path.join(this._path, 'data', 'leveldb'));
|
||||||
|
datalogBytes = await this._getUsage(Path.join(this._path, 'data', 'datalog'));
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
|
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this._metricsHandlers.leveldbBytes.set(leveldbBytes);
|
||||||
|
this._metricsHandlers.datalogBytes.set(datalogBytes);
|
||||||
|
|
||||||
|
const size = leveldbBytes + datalogBytes;
|
||||||
if (this._hardLimit !== null) {
|
if (this._hardLimit !== null) {
|
||||||
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
|
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
|
||||||
|
|
||||||
|
@ -185,10 +234,12 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
if (shouldLock) {
|
if (shouldLock) {
|
||||||
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
|
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
|
||||||
await this._disableWarp10Updates();
|
await this._disableWarp10Updates();
|
||||||
|
this._metricsHandlers.isLocked.set(1)
|
||||||
} else {
|
} else {
|
||||||
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
|
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
|
||||||
{ nodeId: this.nodeId });
|
{ nodeId: this.nodeId });
|
||||||
await this._enableWarp10Updates();
|
await this._enableWarp10Updates();
|
||||||
|
this._metricsHandlers.isLocked.set(0)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
const promClient = require('prom-client');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const { UtapiMetric } = require('../models');
|
const { UtapiMetric } = require('../models');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
|
@ -16,12 +17,38 @@ const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||||
|
|
||||||
class IngestShardTask extends BaseTask {
|
class IngestShardTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super({
|
||||||
|
...options,
|
||||||
|
enableMetrics: config.metrics.enabled,
|
||||||
|
metricsHost: config.metrics.host,
|
||||||
|
metricsPort: config.metrics.ingestPort,
|
||||||
|
});
|
||||||
|
|
||||||
this._defaultSchedule = config.ingestionSchedule;
|
this._defaultSchedule = config.ingestionSchedule;
|
||||||
this._defaultLag = config.ingestionLagSeconds;
|
this._defaultLag = config.ingestionLagSeconds;
|
||||||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line class-methods-use-this
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
const ingestedTotal = new promClient.Counter({
|
||||||
|
name: 'utapi_ingest_shard_task_ingest_total',
|
||||||
|
help: 'Number of metrics ingested',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
const ingestedSlow = new promClient.Counter({
|
||||||
|
name: 'utapi_ingest_shard_task_slow_total',
|
||||||
|
help: 'Number of slow metrics ingested',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
ingestedTotal,
|
||||||
|
ingestedSlow,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
_hydrateEvent(data, stripTimestamp = false) {
|
_hydrateEvent(data, stripTimestamp = false) {
|
||||||
const event = JSON.parse(data);
|
const event = JSON.parse(data);
|
||||||
if (this._stripEventUUID) {
|
if (this._stripEventUUID) {
|
||||||
|
@ -84,6 +111,11 @@ class IngestShardTask extends BaseTask {
|
||||||
assert.strictEqual(status, records.length);
|
assert.strictEqual(status, records.length);
|
||||||
await this._cache.deleteShard(shard);
|
await this._cache.deleteShard(shard);
|
||||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||||
|
|
||||||
|
this._metricsHandlers.ingestedTotal.inc(records.length);
|
||||||
|
if (areSlowEvents) {
|
||||||
|
this._metricsHandlers.ingestedSlow.inc(records.length);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.debug('No events found in shard, cleaning up');
|
logger.debug('No events found in shard, cleaning up');
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,13 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class ReindexTask extends BaseTask {
|
class ReindexTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super({
|
||||||
|
...options,
|
||||||
|
enableMetrics: config.metrics.enabled,
|
||||||
|
metricsHost: config.metrics.host,
|
||||||
|
metricsPort: config.metrics.reindexPort,
|
||||||
|
});
|
||||||
|
|
||||||
this._defaultSchedule = config.reindexSchedule;
|
this._defaultSchedule = config.reindexSchedule;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
const eventFilters = (config && config.filter) || {};
|
const eventFilters = (config && config.filter) || {};
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
const promClient = require('prom-client');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { LoggerContext } = require('../utils');
|
const { LoggerContext } = require('../utils');
|
||||||
|
@ -9,11 +10,30 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class RepairTask extends BaseTask {
|
class RepairTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super(options);
|
super({
|
||||||
|
...options,
|
||||||
|
enableMetrics: config.metrics.enabled,
|
||||||
|
metricsHost: config.metrics.host,
|
||||||
|
metricsPort: config.metrics.repairPort,
|
||||||
|
});
|
||||||
|
|
||||||
this._defaultSchedule = config.repairSchedule;
|
this._defaultSchedule = config.repairSchedule;
|
||||||
this._defaultLag = repairLagSecs;
|
this._defaultLag = repairLagSecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// eslint-disable-next-line class-methods-use-this
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
const created = new promClient.Counter({
|
||||||
|
name: 'utapi_repair_task_created_total',
|
||||||
|
help: 'Number of repair records created',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return {
|
||||||
|
created,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
||||||
|
|
||||||
|
@ -30,6 +50,7 @@ class RepairTask extends BaseTask {
|
||||||
});
|
});
|
||||||
if (status.result[0]) {
|
if (status.result[0]) {
|
||||||
logger.info(`created ${status.result[0]} corrections`);
|
logger.info(`created ${status.result[0]} corrections`);
|
||||||
|
this._metricsHandlers.created.inc(status.result[0]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ const timestamp = require('./timestamp');
|
||||||
const func = require('./func');
|
const func = require('./func');
|
||||||
const disk = require('./disk');
|
const disk = require('./disk');
|
||||||
const filter = require('./filter');
|
const filter = require('./filter');
|
||||||
|
const probe = require('./probe');
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
...log,
|
...log,
|
||||||
|
@ -12,4 +13,5 @@ module.exports = {
|
||||||
...func,
|
...func,
|
||||||
...disk,
|
...disk,
|
||||||
...filter,
|
...filter,
|
||||||
|
...probe,
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
|
||||||
|
const { ProbeServer } = require('arsenal').network.probe.ProbeServer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configure probe servers
|
||||||
|
* @typedef {Object} ProbeServerConfig
|
||||||
|
* @property {string} bindAddress - Address to bind probe server to
|
||||||
|
* @property {number} port - Port to bind probe server to
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start an empty probe server
|
||||||
|
* @async
|
||||||
|
* @param {ProbeServerConfig} config - Configuration for probe server
|
||||||
|
* @returns {Promise<ProbeServer>} - Instance of ProbeServer
|
||||||
|
*/
|
||||||
|
async function startProbeServer(config) {
|
||||||
|
if (!config) {
|
||||||
|
throw new Error('configuration for probe server is missing');
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
const probeServer = new ProbeServer(config);
|
||||||
|
probeServer.onListening(() => resolve(probeServer));
|
||||||
|
probeServer.onError(err => reject(err));
|
||||||
|
probeServer.start();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
startProbeServer,
|
||||||
|
};
|
|
@ -0,0 +1,83 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const needle = require('needle');
|
||||||
|
const promClient = require('prom-client');
|
||||||
|
const sinon = require('sinon');
|
||||||
|
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
|
||||||
|
|
||||||
|
const { BaseTask } = require('../../../../libV2/tasks');
|
||||||
|
const { clients: warp10Clients } = require('../../../../libV2/warp10');
|
||||||
|
|
||||||
|
const { getMetricValues } = require('../../../utils/prom');
|
||||||
|
|
||||||
|
const METRICS_SERVER_PORT = 10999;
|
||||||
|
|
||||||
|
class CustomTask extends BaseTask {
|
||||||
|
// eslint-disable-next-line class-methods-use-this
|
||||||
|
_registerMetricHandlers() {
|
||||||
|
const foo = new promClient.Gauge({
|
||||||
|
name: 'utapi_custom_task_foo_total',
|
||||||
|
help: 'Count of foos',
|
||||||
|
labelNames: ['origin', 'containerName'],
|
||||||
|
});
|
||||||
|
|
||||||
|
return { foo };
|
||||||
|
}
|
||||||
|
|
||||||
|
async _execute() {
|
||||||
|
this._metricsHandlers.foo.inc(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('Test BaseTask metrics', () => {
|
||||||
|
let task;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
task = new CustomTask({
|
||||||
|
enableMetrics: true,
|
||||||
|
metricsPort: METRICS_SERVER_PORT,
|
||||||
|
warp10: [warp10Clients[0]],
|
||||||
|
});
|
||||||
|
await task.setup();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
task.join();
|
||||||
|
promClient.register.clear();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should start a metrics server on the provided port', async () => {
|
||||||
|
const res = await needle(
|
||||||
|
'get',
|
||||||
|
`http://localhost:${METRICS_SERVER_PORT}${DEFAULT_METRICS_ROUTE}`,
|
||||||
|
);
|
||||||
|
const lines = res.body.split('\n');
|
||||||
|
const first = lines[0];
|
||||||
|
assert.strictEqual(res.statusCode, 200);
|
||||||
|
assert(first.startsWith('# HELP'));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should push metrics for a task execution', async () => {
|
||||||
|
await task.execute();
|
||||||
|
const timeValues = await getMetricValues('utapi_custom_task_execution_seconds');
|
||||||
|
assert.strictEqual(timeValues.length, 1);
|
||||||
|
|
||||||
|
const attemptsValues = await getMetricValues('utapi_custom_task_attempts_total');
|
||||||
|
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
|
||||||
|
|
||||||
|
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
||||||
|
assert.deepStrictEqual(failuresValues, []);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should push metrics for a failed task execution', async () => {
|
||||||
|
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
|
||||||
|
await task.execute();
|
||||||
|
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
||||||
|
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should allow custom handlers to be registered', async () => {
|
||||||
|
await task.execute();
|
||||||
|
const fooValues = await getMetricValues('utapi_custom_task_foo_total');
|
||||||
|
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,11 @@
|
||||||
|
const promClient = require('prom-client');
|
||||||
|
|
||||||
|
async function getMetricValues(name) {
|
||||||
|
const metric = await promClient.register.getSingleMetric(name);
|
||||||
|
const data = await metric.get();
|
||||||
|
return data.values;
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
getMetricValues,
|
||||||
|
};
|
Loading…
Reference in New Issue