Compare commits
No commits in common. "88982fffe69f00fdbe147328118fbc57e3a66380" and "7dd49ca4188c2620b7ee0078bd38527b1c3c06ab" have entirely different histories.
88982fffe6
...
7dd49ca418
|
@ -19,7 +19,7 @@ x-models:
|
||||||
warpscript.maxrecursion: 1000
|
warpscript.maxrecursion: 1000
|
||||||
warpscript.repository.directory: /usr/local/share/warpscript
|
warpscript.repository.directory: /usr/local/share/warpscript
|
||||||
warpscript.extension.logEvent: io.warp10.script.ext.logging.LoggingWarpScriptExtension
|
warpscript.extension.logEvent: io.warp10.script.ext.logging.LoggingWarpScriptExtension
|
||||||
|
|
||||||
redis: &redis
|
redis: &redis
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
|
@ -27,23 +27,23 @@ x-models:
|
||||||
|
|
||||||
services:
|
services:
|
||||||
redis-0:
|
redis-0:
|
||||||
image: redis:6
|
image: redis:5
|
||||||
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||||
ports:
|
ports:
|
||||||
- 6379:6379
|
- 6379:6379
|
||||||
environment:
|
environment:
|
||||||
- HOST_IP="${EXTERNAL_HOST}"
|
- HOST_IP="${EXTERNAL_HOST}"
|
||||||
|
|
||||||
redis-1:
|
redis-1:
|
||||||
image: redis:6
|
image: redis:5
|
||||||
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||||
ports:
|
ports:
|
||||||
- 6380:6380
|
- 6380:6380
|
||||||
environment:
|
environment:
|
||||||
- HOST_IP="${EXTERNAL_HOST}"
|
- HOST_IP="${EXTERNAL_HOST}"
|
||||||
|
|
||||||
redis-sentinel-0:
|
redis-sentinel-0:
|
||||||
image: redis:6
|
image: redis:5
|
||||||
command: |-
|
command: |-
|
||||||
bash -c 'cat > /tmp/sentinel.conf <<EOF
|
bash -c 'cat > /tmp/sentinel.conf <<EOF
|
||||||
port 16379
|
port 16379
|
||||||
|
@ -55,7 +55,7 @@ services:
|
||||||
EOF
|
EOF
|
||||||
redis-sentinel /tmp/sentinel.conf'
|
redis-sentinel /tmp/sentinel.conf'
|
||||||
|
|
||||||
environment:
|
environment:
|
||||||
- HOST_IP="${EXTERNAL_HOST}"
|
- HOST_IP="${EXTERNAL_HOST}"
|
||||||
ports:
|
ports:
|
||||||
- 16379:16379
|
- 16379:16379
|
||||||
|
|
|
@ -54,16 +54,5 @@
|
||||||
"filter": {
|
"filter": {
|
||||||
"allow": {},
|
"allow": {},
|
||||||
"deny": {}
|
"deny": {}
|
||||||
},
|
|
||||||
"metrics" : {
|
|
||||||
"enabled": false,
|
|
||||||
"host": "localhost",
|
|
||||||
"serverPort": 10901,
|
|
||||||
"ingestPort": 10902,
|
|
||||||
"checkpointPort": 10903,
|
|
||||||
"snapshotPort": 10904,
|
|
||||||
"diskUsagePort": 10905,
|
|
||||||
"reindexPort": 10906,
|
|
||||||
"repairPort": 10907
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -375,18 +375,6 @@ class Config {
|
||||||
|
|
||||||
parsedConfig.filter = Config._parseResourceFilters(config.filter);
|
parsedConfig.filter = Config._parseResourceFilters(config.filter);
|
||||||
|
|
||||||
parsedConfig.metrics = {
|
|
||||||
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
|
|
||||||
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
|
|
||||||
serverPort: _loadFromEnv('METRICS_PORT_SERVER', config.metrics.serverPort, _typeCasts.int),
|
|
||||||
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
|
|
||||||
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
|
|
||||||
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
|
|
||||||
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
|
|
||||||
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
|
|
||||||
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
|
|
||||||
};
|
|
||||||
|
|
||||||
return parsedConfig;
|
return parsedConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,17 +115,7 @@ const schema = Joi.object({
|
||||||
return filterObj;
|
return filterObj;
|
||||||
}, {},
|
}, {},
|
||||||
)),
|
)),
|
||||||
metrics: {
|
|
||||||
enabled: Joi.boolean(),
|
|
||||||
host: Joi.string(),
|
|
||||||
serverPort: Joi.number().port(),
|
|
||||||
ingestPort: Joi.number().port(),
|
|
||||||
checkpointPort: Joi.number().port(),
|
|
||||||
snapshotPort: Joi.number().port(),
|
|
||||||
diskUsagePort: Joi.number().port(),
|
|
||||||
reindexPort: Joi.number().port(),
|
|
||||||
repairPort: Joi.number().port(),
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
|
|
||||||
module.exports = schema;
|
module.exports = schema;
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ async function listMetric(ctx, params) {
|
||||||
|
|
||||||
const metric = {
|
const metric = {
|
||||||
...result.metrics,
|
...result.metrics,
|
||||||
timeRange: [start, end],
|
timeRange: [ start, end ],
|
||||||
operations: {
|
operations: {
|
||||||
...emptyOperationsResponse,
|
...emptyOperationsResponse,
|
||||||
...operations,
|
...operations,
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const cron = require('node-schedule');
|
const cron = require('node-schedule');
|
||||||
const cronparser = require('cron-parser');
|
const cronparser = require('cron-parser');
|
||||||
const promClient = require('prom-client');
|
|
||||||
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
|
|
||||||
|
|
||||||
const { client: cacheClient } = require('../cache');
|
const { client: cacheClient } = require('../cache');
|
||||||
const Process = require('../process');
|
const Process = require('../process');
|
||||||
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
|
const { LoggerContext, iterIfError } = require('../utils');
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
const logger = new LoggerContext({
|
||||||
module: 'BaseTask',
|
module: 'BaseTask',
|
||||||
|
@ -24,11 +22,6 @@ class BaseTask extends Process {
|
||||||
this._scheduler = null;
|
this._scheduler = null;
|
||||||
this._defaultSchedule = Now;
|
this._defaultSchedule = Now;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
this._enableMetrics = options.enableMetrics || false;
|
|
||||||
this._metricsHost = options.metricsHost || 'localhost';
|
|
||||||
this._metricsPort = options.metricsPort || 9001;
|
|
||||||
this._metricsHandlers = null;
|
|
||||||
this._probeServer = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async _setup(includeDefaultOpts = true) {
|
async _setup(includeDefaultOpts = true) {
|
||||||
|
@ -46,70 +39,6 @@ class BaseTask extends Process {
|
||||||
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
||||||
.option('-n, --node-id <id>', 'Set a custom node id');
|
.option('-n, --node-id <id>', 'Set a custom node id');
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this._enableMetrics) {
|
|
||||||
this._metricsHandlers = {
|
|
||||||
...this._registerDefaultMetricHandlers(),
|
|
||||||
...this._registerMetricHandlers(),
|
|
||||||
};
|
|
||||||
await this._createProbeServer();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_registerDefaultMetricHandlers() {
|
|
||||||
const taskName = this.constructor.name;
|
|
||||||
|
|
||||||
// Get the name of our subclass in snake case format eg BaseClass => _base_class
|
|
||||||
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
|
|
||||||
|
|
||||||
const executionDuration = new promClient.Gauge({
|
|
||||||
name: `utapi${taskNameSnake}_duration_seconds`,
|
|
||||||
help: `Execution time of the ${taskName} task`,
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const executionAttempts = new promClient.Counter({
|
|
||||||
name: `utapi${taskNameSnake}_attempts_total`,
|
|
||||||
help: `Number of attempts to execute the ${taskName} task`,
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const executionFailures = new promClient.Counter({
|
|
||||||
name: `utapi${taskNameSnake}_failures_total`,
|
|
||||||
help: `Number of failures executing the ${taskName} task`,
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
executionDuration,
|
|
||||||
executionAttempts,
|
|
||||||
executionFailures,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// eslint-disable-next-line class-methods-use-this
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
return {};
|
|
||||||
}
|
|
||||||
|
|
||||||
async _createProbeServer() {
|
|
||||||
this._probeServer = await startProbeServer({
|
|
||||||
host: this._metricsHost,
|
|
||||||
port: this._metricsPort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._probeServer.addHandler(
|
|
||||||
DEFAULT_METRICS_ROUTE,
|
|
||||||
(res, log) => {
|
|
||||||
log.debug('metrics requested');
|
|
||||||
res.writeHead(200, {
|
|
||||||
'Content-Type': promClient.register.contentType,
|
|
||||||
});
|
|
||||||
promClient.register.metrics().then(metrics => {
|
|
||||||
res.end(metrics);
|
|
||||||
});
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get schedule() {
|
get schedule() {
|
||||||
|
@ -150,23 +79,12 @@ class BaseTask extends Process {
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute() {
|
async execute() {
|
||||||
let endTimer;
|
|
||||||
if (this._enableMetrics) {
|
|
||||||
endTimer = this._metricsHandlers.executionDuration.startTimer();
|
|
||||||
this._metricsHandlers.executionAttempts.inc(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const timestamp = new Date() * 1000; // Timestamp in microseconds;
|
const timestamp = new Date() * 1000; // Timestamp in microseconds;
|
||||||
const laggedTimestamp = timestamp - (this.lag * 1000000);
|
const laggedTimestamp = timestamp - (this.lag * 1000000);
|
||||||
await this._execute(laggedTimestamp);
|
await this._execute(laggedTimestamp);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error('Error during task execution', { error });
|
logger.error('Error during task execution', { error });
|
||||||
this._metricsHandlers.executionFailures.inc(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this._enableMetrics) {
|
|
||||||
endTimer();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,9 +94,6 @@ class BaseTask extends Process {
|
||||||
}
|
}
|
||||||
|
|
||||||
async _join() {
|
async _join() {
|
||||||
if (this._probeServer !== null) {
|
|
||||||
this._probeServer.stop();
|
|
||||||
}
|
|
||||||
return this._cache.disconnect();
|
return this._cache.disconnect();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
const promClient = require('prom-client');
|
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
const { checkpointLagSecs, indexedEventFields } = require('../constants');
|
||||||
|
@ -10,30 +9,11 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class CreateCheckpoint extends BaseTask {
|
class CreateCheckpoint extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super({
|
super(options);
|
||||||
...options,
|
|
||||||
enableMetrics: config.metrics.enabled,
|
|
||||||
metricsHost: config.metrics.host,
|
|
||||||
metricsPort: config.metrics.checkpointPort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._defaultSchedule = config.checkpointSchedule;
|
this._defaultSchedule = config.checkpointSchedule;
|
||||||
this._defaultLag = checkpointLagSecs;
|
this._defaultLag = checkpointLagSecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
// eslint-disable-next-line class-methods-use-this
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
const created = new promClient.Counter({
|
|
||||||
name: 'utapi_create_checkpoint_created_total',
|
|
||||||
help: 'Number of checkpoints created',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
created,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
|
||||||
const status = await this.withWarp10(async warp10 => {
|
const status = await this.withWarp10(async warp10 => {
|
||||||
|
@ -49,7 +29,6 @@ class CreateCheckpoint extends BaseTask {
|
||||||
});
|
});
|
||||||
if (status.result[0]) {
|
if (status.result[0]) {
|
||||||
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
logger.info(`created ${status.result[0] || 0} checkpoints`);
|
||||||
this._metricsHandlers.created.inc(status.result[0]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
const promClient = require('prom-client');
|
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { snapshotLagSecs } = require('../constants');
|
const { snapshotLagSecs } = require('../constants');
|
||||||
|
@ -10,30 +9,11 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class CreateSnapshot extends BaseTask {
|
class CreateSnapshot extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super({
|
super(options);
|
||||||
...options,
|
|
||||||
enableMetrics: config.metrics.enabled,
|
|
||||||
metricsHost: config.metrics.host,
|
|
||||||
metricsPort: config.metrics.snapshotPort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._defaultSchedule = config.snapshotSchedule;
|
this._defaultSchedule = config.snapshotSchedule;
|
||||||
this._defaultLag = snapshotLagSecs;
|
this._defaultLag = snapshotLagSecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
// eslint-disable-next-line class-methods-use-this
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
const created = new promClient.Counter({
|
|
||||||
name: 'utapi_create_snapshot_created_total',
|
|
||||||
help: 'Number of snapshots created',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
created,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
|
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
|
||||||
|
|
||||||
|
@ -49,7 +29,6 @@ class CreateSnapshot extends BaseTask {
|
||||||
});
|
});
|
||||||
if (status.result[0]) {
|
if (status.result[0]) {
|
||||||
logger.info(`created ${status.result[0]} snapshots`);
|
logger.info(`created ${status.result[0]} snapshots`);
|
||||||
this._metricsHandlers.created.inc(status.result[0]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,4 @@
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const Path = require('path');
|
|
||||||
const promClient = require('prom-client');
|
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { expirationChunkDuration } = require('../constants');
|
const { expirationChunkDuration } = require('../constants');
|
||||||
|
@ -18,13 +16,9 @@ const ACTION_THRESHOLD = 0.95;
|
||||||
|
|
||||||
class MonitorDiskUsage extends BaseTask {
|
class MonitorDiskUsage extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super({
|
super(
|
||||||
...options,
|
options,
|
||||||
enableMetrics: config.metrics.enabled,
|
);
|
||||||
metricsHost: config.metrics.host,
|
|
||||||
metricsPort: config.metrics.diskUsagePort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._defaultSchedule = config.diskUsageSchedule;
|
this._defaultSchedule = config.diskUsageSchedule;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
this._path = config.diskUsage.path;
|
this._path = config.diskUsage.path;
|
||||||
|
@ -48,39 +42,6 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
const isLocked = new promClient.Gauge({
|
|
||||||
name: 'utapi_monitor_disk_usage_is_locked',
|
|
||||||
help: 'Indicates whether the monitored warp 10 has had writes disabled',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const leveldbBytes = new promClient.Gauge({
|
|
||||||
name: 'utapi_monitor_disk_usage_leveldb_bytes',
|
|
||||||
help: 'Total bytes used by warp 10',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const datalogBytes = new promClient.Gauge({
|
|
||||||
name: 'utapi_monitor_disk_usage_datalog_bytes',
|
|
||||||
help: 'Total bytes used by warp 10',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const hardLimitRatio = new promClient.Gauge({
|
|
||||||
name: 'utapi_monitor_disk_usage_hard_limit_ratio',
|
|
||||||
help: 'Percent of the hard limit used by warp 10',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
isLocked,
|
|
||||||
leveldbBytes,
|
|
||||||
datalogBytes,
|
|
||||||
hardLimitRatio,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
get isLeader() {
|
get isLeader() {
|
||||||
return this._program.leader !== undefined;
|
return this._program.leader !== undefined;
|
||||||
}
|
}
|
||||||
|
@ -93,9 +54,9 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
return this._program.lock !== undefined;
|
return this._program.lock !== undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
_getUsage(path) {
|
_getUsage() {
|
||||||
moduleLogger.debug(`calculating disk usage for ${path}`);
|
moduleLogger.debug(`calculating disk usage for ${this._path}`);
|
||||||
return getFolderSize(path);
|
return getFolderSize(this._path);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _expireMetrics(timestamp) {
|
async _expireMetrics(timestamp) {
|
||||||
|
@ -152,8 +113,6 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
nodeId,
|
nodeId,
|
||||||
});
|
});
|
||||||
|
|
||||||
this._metricsHandlers.hardLimitRatio.set(hardPercentage)
|
|
||||||
|
|
||||||
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
|
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
|
||||||
|
|
||||||
if (hardPercentage < WARN_THRESHOLD) {
|
if (hardPercentage < WARN_THRESHOLD) {
|
||||||
|
@ -191,14 +150,12 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
if (this.isManualUnlock) {
|
if (this.isManualUnlock) {
|
||||||
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
|
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
|
||||||
await this._enableWarp10Updates();
|
await this._enableWarp10Updates();
|
||||||
this._metricsHandlers.isLocked.set(0)
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.isManualLock) {
|
if (this.isManualLock) {
|
||||||
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
|
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
|
||||||
await this._disableWarp10Updates();
|
await this._disableWarp10Updates();
|
||||||
this._metricsHandlers.isLocked.set(1)
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,20 +170,14 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let leveldbBytes = null;
|
let size = null;
|
||||||
let datalogBytes = null
|
|
||||||
try {
|
try {
|
||||||
leveldbBytes = await this._getUsage(Path.join(this._path, 'data', 'leveldb'));
|
size = await this._getUsage();
|
||||||
datalogBytes = await this._getUsage(Path.join(this._path, 'data', 'datalog'));
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
|
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this._metricsHandlers.leveldbBytes.set(leveldbBytes);
|
|
||||||
this._metricsHandlers.datalogBytes.set(datalogBytes);
|
|
||||||
|
|
||||||
const size = leveldbBytes + datalogBytes;
|
|
||||||
if (this._hardLimit !== null) {
|
if (this._hardLimit !== null) {
|
||||||
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
|
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
|
||||||
|
|
||||||
|
@ -234,12 +185,10 @@ class MonitorDiskUsage extends BaseTask {
|
||||||
if (shouldLock) {
|
if (shouldLock) {
|
||||||
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
|
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
|
||||||
await this._disableWarp10Updates();
|
await this._disableWarp10Updates();
|
||||||
this._metricsHandlers.isLocked.set(1)
|
|
||||||
} else {
|
} else {
|
||||||
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
|
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
|
||||||
{ nodeId: this.nodeId });
|
{ nodeId: this.nodeId });
|
||||||
await this._enableWarp10Updates();
|
await this._enableWarp10Updates();
|
||||||
this._metricsHandlers.isLocked.set(0)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const promClient = require('prom-client');
|
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const { UtapiMetric } = require('../models');
|
const { UtapiMetric } = require('../models');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
|
@ -17,38 +16,12 @@ const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||||
|
|
||||||
class IngestShardTask extends BaseTask {
|
class IngestShardTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super({
|
super(options);
|
||||||
...options,
|
|
||||||
enableMetrics: config.metrics.enabled,
|
|
||||||
metricsHost: config.metrics.host,
|
|
||||||
metricsPort: config.metrics.ingestPort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._defaultSchedule = config.ingestionSchedule;
|
this._defaultSchedule = config.ingestionSchedule;
|
||||||
this._defaultLag = config.ingestionLagSeconds;
|
this._defaultLag = config.ingestionLagSeconds;
|
||||||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// eslint-disable-next-line class-methods-use-this
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
const ingestedTotal = new promClient.Counter({
|
|
||||||
name: 'utapi_ingest_shard_task_ingest_total',
|
|
||||||
help: 'Number of metrics ingested',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
const ingestedSlow = new promClient.Counter({
|
|
||||||
name: 'utapi_ingest_shard_task_slow_total',
|
|
||||||
help: 'Number of slow metrics ingested',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
ingestedTotal,
|
|
||||||
ingestedSlow,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
_hydrateEvent(data, stripTimestamp = false) {
|
_hydrateEvent(data, stripTimestamp = false) {
|
||||||
const event = JSON.parse(data);
|
const event = JSON.parse(data);
|
||||||
if (this._stripEventUUID) {
|
if (this._stripEventUUID) {
|
||||||
|
@ -111,11 +84,6 @@ class IngestShardTask extends BaseTask {
|
||||||
assert.strictEqual(status, records.length);
|
assert.strictEqual(status, records.length);
|
||||||
await this._cache.deleteShard(shard);
|
await this._cache.deleteShard(shard);
|
||||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||||
|
|
||||||
this._metricsHandlers.ingestedTotal.inc(records.length);
|
|
||||||
if (areSlowEvents) {
|
|
||||||
this._metricsHandlers.ingestedSlow.inc(records.length);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logger.debug('No events found in shard, cleaning up');
|
logger.debug('No events found in shard, cleaning up');
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,13 +20,7 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class ReindexTask extends BaseTask {
|
class ReindexTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super({
|
super(options);
|
||||||
...options,
|
|
||||||
enableMetrics: config.metrics.enabled,
|
|
||||||
metricsHost: config.metrics.host,
|
|
||||||
metricsPort: config.metrics.reindexPort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._defaultSchedule = config.reindexSchedule;
|
this._defaultSchedule = config.reindexSchedule;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
const eventFilters = (config && config.filter) || {};
|
const eventFilters = (config && config.filter) || {};
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
const promClient = require('prom-client');
|
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const config = require('../config');
|
const config = require('../config');
|
||||||
const { LoggerContext } = require('../utils');
|
const { LoggerContext } = require('../utils');
|
||||||
|
@ -10,30 +9,11 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class RepairTask extends BaseTask {
|
class RepairTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(options) {
|
||||||
super({
|
super(options);
|
||||||
...options,
|
|
||||||
enableMetrics: config.metrics.enabled,
|
|
||||||
metricsHost: config.metrics.host,
|
|
||||||
metricsPort: config.metrics.repairPort,
|
|
||||||
});
|
|
||||||
|
|
||||||
this._defaultSchedule = config.repairSchedule;
|
this._defaultSchedule = config.repairSchedule;
|
||||||
this._defaultLag = repairLagSecs;
|
this._defaultLag = repairLagSecs;
|
||||||
}
|
}
|
||||||
|
|
||||||
// eslint-disable-next-line class-methods-use-this
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
const created = new promClient.Counter({
|
|
||||||
name: 'utapi_repair_task_created_total',
|
|
||||||
help: 'Number of repair records created',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return {
|
|
||||||
created,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute(timestamp) {
|
async _execute(timestamp) {
|
||||||
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
|
||||||
|
|
||||||
|
@ -50,7 +30,6 @@ class RepairTask extends BaseTask {
|
||||||
});
|
});
|
||||||
if (status.result[0]) {
|
if (status.result[0]) {
|
||||||
logger.info(`created ${status.result[0]} corrections`);
|
logger.info(`created ${status.result[0]} corrections`);
|
||||||
this._metricsHandlers.created.inc(status.result[0]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ const timestamp = require('./timestamp');
|
||||||
const func = require('./func');
|
const func = require('./func');
|
||||||
const disk = require('./disk');
|
const disk = require('./disk');
|
||||||
const filter = require('./filter');
|
const filter = require('./filter');
|
||||||
const probe = require('./probe');
|
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
...log,
|
...log,
|
||||||
|
@ -13,5 +12,4 @@ module.exports = {
|
||||||
...func,
|
...func,
|
||||||
...disk,
|
...disk,
|
||||||
...filter,
|
...filter,
|
||||||
...probe,
|
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,32 +0,0 @@
|
||||||
|
|
||||||
const { ProbeServer } = require('arsenal').network.probe.ProbeServer;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configure probe servers
|
|
||||||
* @typedef {Object} ProbeServerConfig
|
|
||||||
* @property {string} bindAddress - Address to bind probe server to
|
|
||||||
* @property {number} port - Port to bind probe server to
|
|
||||||
*/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start an empty probe server
|
|
||||||
* @async
|
|
||||||
* @param {ProbeServerConfig} config - Configuration for probe server
|
|
||||||
* @returns {Promise<ProbeServer>} - Instance of ProbeServer
|
|
||||||
*/
|
|
||||||
async function startProbeServer(config) {
|
|
||||||
if (!config) {
|
|
||||||
throw new Error('configuration for probe server is missing');
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
const probeServer = new ProbeServer(config);
|
|
||||||
probeServer.onListening(() => resolve(probeServer));
|
|
||||||
probeServer.onError(err => reject(err));
|
|
||||||
probeServer.start();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
startProbeServer,
|
|
||||||
};
|
|
|
@ -1,83 +0,0 @@
|
||||||
const assert = require('assert');
|
|
||||||
const needle = require('needle');
|
|
||||||
const promClient = require('prom-client');
|
|
||||||
const sinon = require('sinon');
|
|
||||||
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
|
|
||||||
|
|
||||||
const { BaseTask } = require('../../../../libV2/tasks');
|
|
||||||
const { clients: warp10Clients } = require('../../../../libV2/warp10');
|
|
||||||
|
|
||||||
const { getMetricValues } = require('../../../utils/prom');
|
|
||||||
|
|
||||||
const METRICS_SERVER_PORT = 10999;
|
|
||||||
|
|
||||||
class CustomTask extends BaseTask {
|
|
||||||
// eslint-disable-next-line class-methods-use-this
|
|
||||||
_registerMetricHandlers() {
|
|
||||||
const foo = new promClient.Gauge({
|
|
||||||
name: 'utapi_custom_task_foo_total',
|
|
||||||
help: 'Count of foos',
|
|
||||||
labelNames: ['origin', 'containerName'],
|
|
||||||
});
|
|
||||||
|
|
||||||
return { foo };
|
|
||||||
}
|
|
||||||
|
|
||||||
async _execute() {
|
|
||||||
this._metricsHandlers.foo.inc(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
describe('Test BaseTask metrics', () => {
|
|
||||||
let task;
|
|
||||||
|
|
||||||
beforeEach(async () => {
|
|
||||||
task = new CustomTask({
|
|
||||||
enableMetrics: true,
|
|
||||||
metricsPort: METRICS_SERVER_PORT,
|
|
||||||
warp10: [warp10Clients[0]],
|
|
||||||
});
|
|
||||||
await task.setup();
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(async () => {
|
|
||||||
task.join();
|
|
||||||
promClient.register.clear();
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should start a metrics server on the provided port', async () => {
|
|
||||||
const res = await needle(
|
|
||||||
'get',
|
|
||||||
`http://localhost:${METRICS_SERVER_PORT}${DEFAULT_METRICS_ROUTE}`,
|
|
||||||
);
|
|
||||||
const lines = res.body.split('\n');
|
|
||||||
const first = lines[0];
|
|
||||||
assert.strictEqual(res.statusCode, 200);
|
|
||||||
assert(first.startsWith('# HELP'));
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should push metrics for a task execution', async () => {
|
|
||||||
await task.execute();
|
|
||||||
const timeValues = await getMetricValues('utapi_custom_task_execution_seconds');
|
|
||||||
assert.strictEqual(timeValues.length, 1);
|
|
||||||
|
|
||||||
const attemptsValues = await getMetricValues('utapi_custom_task_attempts_total');
|
|
||||||
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
|
|
||||||
|
|
||||||
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
|
||||||
assert.deepStrictEqual(failuresValues, []);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should push metrics for a failed task execution', async () => {
|
|
||||||
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
|
|
||||||
await task.execute();
|
|
||||||
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
|
||||||
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should allow custom handlers to be registered', async () => {
|
|
||||||
await task.execute();
|
|
||||||
const fooValues = await getMetricValues('utapi_custom_task_foo_total');
|
|
||||||
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
|
|
||||||
});
|
|
||||||
});
|
|
|
@ -1,11 +0,0 @@
|
||||||
const promClient = require('prom-client');
|
|
||||||
|
|
||||||
async function getMetricValues(name) {
|
|
||||||
const metric = await promClient.register.getSingleMetric(name);
|
|
||||||
const data = await metric.get();
|
|
||||||
return data.values;
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
|
||||||
getMetricValues,
|
|
||||||
};
|
|
Loading…
Reference in New Issue