Compare commits

...

6 Commits

8 changed files with 154 additions and 26 deletions

View File

@ -19,7 +19,10 @@ class RedisCache {
moduleLogger.debug('Connecting to redis...');
this._redis = new RedisClient(this._options);
this._redis.connect();
return true;
return new Promise((resolve, reject) => {
this._redis.once('ready', () => resolve(true));
this._redis.once('error', error => reject(error));
});
}
async disconnect() {

View File

@ -48,6 +48,11 @@ class BaseTask extends Process {
}
if (this._enableMetrics) {
promClient.collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
this._metricsHandlers = {
...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(),

View File

@ -29,8 +29,26 @@ class CreateCheckpoint extends BaseTask {
labelNames: ['origin', 'containerName'],
});
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
const lastCheckpoint = new promClient.Gauge({
name: 'utapi_create_checkpoint_last_checkpoint_seconds',
help: 'Timestamp of the last successfully created checkpoint',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastCheckpoint();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastCheckpoint,
};
}
@ -55,6 +73,25 @@ class CreateCheckpoint extends BaseTask {
}
}
async _getLastCheckpoint() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.checkpoint.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (resp.result && (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {

View File

@ -29,8 +29,26 @@ class CreateSnapshot extends BaseTask {
labelNames: ['origin', 'containerName'],
});
const getLastSnapshot = this._getLastSnapshot.bind(this);
const lastSnapshot = new promClient.Gauge({
name: 'utapi_create_snapshot_last_snapshot_seconds',
help: 'Timestamp of the last successfully created snapshot',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastSnapshot();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastSnapshot,
};
}
@ -55,6 +73,25 @@ class CreateSnapshot extends BaseTask {
}
}
async _getLastSnapshot() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (resp.result && (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) {
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });

View File

@ -1,4 +1,6 @@
const async = require('async');
const Path = require('path');
const fs = require('fs');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
@ -55,9 +57,15 @@ class MonitorDiskUsage extends BaseTask {
labelNames: ['origin', 'containerName'],
});
const diskUsage = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_bytes',
help: 'Total bytes used by warp 10',
const leveldbBytes = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_leveldb_bytes',
help: 'Total bytes used by warp 10 leveldb',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10 datalog',
labelNames: ['origin', 'containerName'],
});
@ -75,7 +83,8 @@ class MonitorDiskUsage extends BaseTask {
return {
isLocked,
diskUsage,
leveldbBytes,
datalogBytes,
hardLimitRatio,
hardLimitSetting,
};
@ -85,7 +94,8 @@ class MonitorDiskUsage extends BaseTask {
* Metrics for MonitorDiskUsage
* @typedef {Object} MonitorDiskUsageMetrics
* @property {boolean} isLocked - Indicates if writes have been disabled for the monitored warp10
* @property {number} diskUsage - Total bytes used by warp 10
* @property {number} leveldbBytes - Total bytes used by warp 10 leveldb
* @property {number} datalogBytes - Total bytes used by warp 10 datalog
* @property {number} hardLimitRatio - Percent of the hard limit used by warp 10
* @property {number} hardLimitSetting - The hard limit setting in bytes
*/
@ -104,8 +114,12 @@ class MonitorDiskUsage extends BaseTask {
this._metricsHandlers.isLocked.set(metrics.isLocked ? 1 : 0);
}
if (metrics.diskUsage !== undefined) {
this._metricsHandlers.diskUsage.set(metrics.diskUsage);
if (metrics.leveldbBytes !== undefined) {
this._metricsHandlers.leveldbBytes.set(metrics.leveldbBytes);
}
if (metrics.datalogBytes !== undefined) {
this._metricsHandlers.datalogBytes.set(metrics.datalogBytes);
}
if (metrics.hardLimitRatio !== undefined) {
@ -129,9 +143,13 @@ class MonitorDiskUsage extends BaseTask {
return this._program.lock !== undefined;
}
_getUsage() {
moduleLogger.debug(`calculating disk usage for ${this._path}`);
return getFolderSize(this._path);
// eslint-disable-next-line class-methods-use-this
async _getUsage(path) {
moduleLogger.debug(`calculating disk usage for ${path}`);
if (!fs.existsSync(path)) {
throw Error(`failed to calculate usage for non-existent path ${path}`);
}
return getFolderSize(path);
}
async _expireMetrics(timestamp) {
@ -249,16 +267,19 @@ class MonitorDiskUsage extends BaseTask {
return;
}
let size = null;
let leveldbBytes = null;
let datalogBytes = null;
try {
size = await this._getUsage();
leveldbBytes = await this._getUsage(Path.join(this._path, 'leveldb'));
datalogBytes = await this._getUsage(Path.join(this._path, 'datalog'));
} catch (error) {
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
return;
}
this._pushMetrics({ diskUsage: size });
this._pushMetrics({ leveldbBytes, datalogBytes });
const size = leveldbBytes + datalogBytes;
if (this._hardLimit !== null) {
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });

View File

@ -1,4 +1,5 @@
const assert = require('assert');
const fs = require('fs');
const sinon = require('sinon');
const uuid = require('uuid');
const promClient = require('prom-client');
@ -17,6 +18,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(`${path}/datalog`, { recursive: true });
promClient.register.clear();
task = new MonitorDiskUsage({ warp10: warp10Clients, enableMetrics: true });
await task.setup();
@ -27,7 +29,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
afterEach(async () => task.join());
it('should trigger a database lock if above the limit', async () => {
fillDir(path, { count: 1, size: 100 });
fillDir(`${path}/leveldb`, { count: 1, size: 100 });
task._hardLimit = 1;
const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -43,7 +45,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
});
it('should trigger a database unlock if below the limit', async () => {
fillDir(path, { count: 1, size: 100 });
fillDir(`${path}/leveldb`, { count: 1, size: 100 });
task._hardLimit = 10240;
const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -56,7 +58,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
});
it('should not throw when failing to calculate disk usage', async () => {
fillDir(path, { count: 1, size: 100 });
fillDir(`${path}/leveldb`, { count: 1, size: 100 });
task._hardLimit = 1;
sinon.stub(task, '_getUsage').throws();
const _task = task.execute();

View File

@ -1,4 +1,5 @@
const assert = require('assert');
const fs = require('fs');
const promClient = require('prom-client');
const uuid = require('uuid');
@ -9,9 +10,10 @@ const { fillDir } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
class MonitorDiskUsageShim extends MonitorDiskUsage {
async _getUsage() {
this.usage = await super._getUsage();
return this.usage;
async _getUsage(path) {
const usage = await super._getUsage(path);
this.usage = (this.usage || 0) + usage;
return usage;
}
}
@ -45,6 +47,7 @@ describe('Test MonitorDiskUsage', () => {
beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(path);
task = new MonitorDiskUsageShim({ warp10: [], enableMetrics: true });
task._path = path;
task._enabled = true;
@ -56,12 +59,23 @@ describe('Test MonitorDiskUsage', () => {
promClient.register.clear();
});
testCases.map(testCase =>
testCases.map(testCase => {
it(`should calculate disk usage for ${testCase.count} files of ${testCase.size} bytes each`,
async () => {
fillDir(path, testCase);
fillDir(`${path}/leveldb`, testCase);
fillDir(`${path}/datalog`, testCase);
await task._execute();
assert.strictEqual(task.usage, testCase.expected + emptyDirSize + (emptyFileSize * testCase.count));
await assertMetricValue('utapi_monitor_disk_usage_bytes', task.usage);
}));
const expectedSingleSize = emptyDirSize + testCase.expected + (emptyFileSize * testCase.count);
const expectedTotalSize = expectedSingleSize * 2;
assert.strictEqual(task.usage, expectedTotalSize);
// Should equal the usage minus the empty datalog
await assertMetricValue('utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
await assertMetricValue('utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
});
});
it('should fail if a subpath does not exists', () => {
assert.doesNotThrow(() => task._execute());
assert.strictEqual(task.usage, undefined);
});
});

View File

@ -115,7 +115,16 @@
%> FOREACH
%>
<%
DROP 0 STOP
// If no new events were found
// - drop the empty list
// - write a new master checkpoint
// - return 0
DROP
NEWGTS $master_checkpoint_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
0 STOP
%> IFTE
0 'checkpoints' STORE