Compare commits

..

No commits in common. "0b79f1e72636337b52794e057020352594f91e1d" and "c2f121d0d3a2fca0dc22627483d9f161f061684b" have entirely different histories.

15 changed files with 28 additions and 259 deletions

View File

@ -103,8 +103,7 @@ jobs:
test: test:
- name: run unit tests - name: run unit tests
command: yarn test command: yarn test
env: env: {}
UTAPI_METRICS_ENABLED: 'true'
- name: run client tests - name: run client tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client
env: {} env: {}

View File

@ -19,10 +19,7 @@ class RedisCache {
moduleLogger.debug('Connecting to redis...'); moduleLogger.debug('Connecting to redis...');
this._redis = new RedisClient(this._options); this._redis = new RedisClient(this._options);
this._redis.connect(); this._redis.connect();
return new Promise((resolve, reject) => { return true;
this._redis.once('ready', () => resolve(true));
this._redis.once('error', error => reject(error));
});
} }
async disconnect() { async disconnect() {

View File

@ -3,7 +3,6 @@ const Joi = require('@hapi/joi');
const { buildModel } = require('./Base'); const { buildModel } = require('./Base');
const { apiOperations } = require('../server/spec'); const { apiOperations } = require('../server/spec');
const ResponseContainer = require('./ResponseContainer'); const ResponseContainer = require('./ResponseContainer');
const { httpRequestDurationSeconds } = require('../server/metrics');
const apiTags = Object.keys(apiOperations); const apiTags = Object.keys(apiOperations);
const apiOperationIds = Object.values(apiOperations) const apiOperationIds = Object.values(apiOperations)
@ -22,7 +21,6 @@ const contextSchema = {
logger: Joi.any(), logger: Joi.any(),
request: Joi.any(), request: Joi.any(),
results: Joi.any(), results: Joi.any(),
requestTimer: Joi.any(),
}; };
const RequestContextModel = buildModel('RequestContext', contextSchema); const RequestContextModel = buildModel('RequestContext', contextSchema);
@ -36,10 +34,6 @@ class RequestContext extends RequestContextModel {
const tag = request.swagger.operation['x-router-controller']; const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation; const { operationId } = request.swagger.operation;
const requestTimer = tag !== 'internal'
? httpRequestDurationSeconds.startTimer({ action: operationId })
: null;
request.logger.logger.addDefaultFields({ request.logger.logger.addDefaultFields({
tag, tag,
operationId, operationId,
@ -56,7 +50,6 @@ class RequestContext extends RequestContextModel {
encrypted, encrypted,
results: new ResponseContainer(), results: new ResponseContainer(),
logger: request.logger, logger: request.logger,
requestTimer,
}); });
} }

View File

@ -28,7 +28,6 @@ class UtapiServer extends Process {
app.use(middleware.loggerMiddleware); app.use(middleware.loggerMiddleware);
await initializeOasTools(spec, app); await initializeOasTools(spec, app);
app.use(middleware.errorMiddleware); app.use(middleware.errorMiddleware);
app.use(middleware.httpMetricsMiddleware);
app.use(middleware.responseLoggerMiddleware); app.use(middleware.responseLoggerMiddleware);
return app; return app;
} }

View File

@ -1,20 +0,0 @@
const promClient = require('prom-client');
const httpRequestsTotal = new promClient.Counter({
name: 'utapi_http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['action', 'code'],
});
const httpRequestDurationSeconds = new promClient.Histogram({
name: 'utapi_http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['action', 'code'],
// buckets for response time from 0.1ms to 60s
buckets: [0.0001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0],
});
module.exports = {
httpRequestDurationSeconds,
httpRequestsTotal,
};

View File

@ -6,7 +6,6 @@ const config = require('../config');
const { logger, buildRequestLogger } = require('../utils'); const { logger, buildRequestLogger } = require('../utils');
const errors = require('../errors'); const errors = require('../errors');
const { translateAndAuthorize } = require('../vault'); const { translateAndAuthorize } = require('../vault');
const metricHandlers = require('./metrics');
const oasOptions = { const oasOptions = {
controllers: path.join(__dirname, './API/'), controllers: path.join(__dirname, './API/'),
@ -56,23 +55,6 @@ function responseLoggerMiddleware(req, res, next) {
} }
} }
function httpMetricsMiddleware(request, response, next) {
// If the request.ctx is undefined then this is an internal oasTools request (/_/docs)
// No metrics should be pushed
if (config.metrics.enabled && request.ctx && request.ctx.tag !== 'internal') {
metricHandlers.httpRequestsTotal
.labels({
action: request.ctx.operationId,
code: response.statusCode,
}).inc(1);
request.ctx.requestTimer({ code: response.statusCode });
}
if (next) {
next();
}
}
// next is purposely not called as all error responses are handled here // next is purposely not called as all error responses are handled here
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
function errorMiddleware(err, req, res, next) { function errorMiddleware(err, req, res, next) {
@ -100,7 +82,7 @@ function errorMiddleware(err, req, res, next) {
code, code,
message, message,
}); });
responseLoggerMiddleware(req, res, () => httpMetricsMiddleware(req, res)); responseLoggerMiddleware(req, res);
} }
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
@ -176,6 +158,5 @@ module.exports = {
responseLoggerMiddleware, responseLoggerMiddleware,
authV4Middleware, authV4Middleware,
clientIpLimitMiddleware, clientIpLimitMiddleware,
httpMetricsMiddleware,
}, },
}; };

View File

@ -48,11 +48,6 @@ class BaseTask extends Process {
} }
if (this._enableMetrics) { if (this._enableMetrics) {
promClient.collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
this._metricsHandlers = { this._metricsHandlers = {
...this._registerDefaultMetricHandlers(), ...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(), ...this._registerMetricHandlers(),

View File

@ -29,26 +29,8 @@ class CreateCheckpoint extends BaseTask {
labelNames: ['origin', 'containerName'], labelNames: ['origin', 'containerName'],
}); });
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
const lastCheckpoint = new promClient.Gauge({
name: 'utapi_create_checkpoint_last_checkpoint_seconds',
help: 'Timestamp of the last successfully created checkpoint',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastCheckpoint();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return { return {
created, created,
lastCheckpoint,
}; };
} }
@ -73,25 +55,6 @@ class CreateCheckpoint extends BaseTask {
} }
} }
async _getLastCheckpoint() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.checkpoint.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (resp.result && (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) { async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp }); logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => { const status = await this.withWarp10(async warp10 => {

View File

@ -29,26 +29,8 @@ class CreateSnapshot extends BaseTask {
labelNames: ['origin', 'containerName'], labelNames: ['origin', 'containerName'],
}); });
const getLastSnapshot = this._getLastSnapshot.bind(this);
const lastSnapshot = new promClient.Gauge({
name: 'utapi_create_snapshot_last_snapshot_seconds',
help: 'Timestamp of the last successfully created snapshot',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastSnapshot();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return { return {
created, created,
lastSnapshot,
}; };
} }
@ -73,25 +55,6 @@ class CreateSnapshot extends BaseTask {
} }
} }
async _getLastSnapshot() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (resp.result && (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) { async _execute(timestamp) {
logger.debug('creating snapshots', { snapshotTimestamp: timestamp }); logger.debug('creating snapshots', { snapshotTimestamp: timestamp });

View File

@ -1,6 +1,4 @@
const async = require('async'); const async = require('async');
const Path = require('path');
const fs = require('fs');
const promClient = require('prom-client'); const promClient = require('prom-client');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const config = require('../config'); const config = require('../config');
@ -57,15 +55,9 @@ class MonitorDiskUsage extends BaseTask {
labelNames: ['origin', 'containerName'], labelNames: ['origin', 'containerName'],
}); });
const leveldbBytes = new promClient.Gauge({ const diskUsage = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_leveldb_bytes', name: 'utapi_monitor_disk_usage_bytes',
help: 'Total bytes used by warp 10 leveldb', help: 'Total bytes used by warp 10',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 'utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10 datalog',
labelNames: ['origin', 'containerName'], labelNames: ['origin', 'containerName'],
}); });
@ -83,8 +75,7 @@ class MonitorDiskUsage extends BaseTask {
return { return {
isLocked, isLocked,
leveldbBytes, diskUsage,
datalogBytes,
hardLimitRatio, hardLimitRatio,
hardLimitSetting, hardLimitSetting,
}; };
@ -94,8 +85,7 @@ class MonitorDiskUsage extends BaseTask {
* Metrics for MonitorDiskUsage * Metrics for MonitorDiskUsage
* @typedef {Object} MonitorDiskUsageMetrics * @typedef {Object} MonitorDiskUsageMetrics
* @property {boolean} isLocked - Indicates if writes have been disabled for the monitored warp10 * @property {boolean} isLocked - Indicates if writes have been disabled for the monitored warp10
* @property {number} leveldbBytes - Total bytes used by warp 10 leveldb * @property {number} diskUsage - Total bytes used by warp 10
* @property {number} datalogBytes - Total bytes used by warp 10 datalog
* @property {number} hardLimitRatio - Percent of the hard limit used by warp 10 * @property {number} hardLimitRatio - Percent of the hard limit used by warp 10
* @property {number} hardLimitSetting - The hard limit setting in bytes * @property {number} hardLimitSetting - The hard limit setting in bytes
*/ */
@ -114,12 +104,8 @@ class MonitorDiskUsage extends BaseTask {
this._metricsHandlers.isLocked.set(metrics.isLocked ? 1 : 0); this._metricsHandlers.isLocked.set(metrics.isLocked ? 1 : 0);
} }
if (metrics.leveldbBytes !== undefined) { if (metrics.diskUsage !== undefined) {
this._metricsHandlers.leveldbBytes.set(metrics.leveldbBytes); this._metricsHandlers.diskUsage.set(metrics.diskUsage);
}
if (metrics.datalogBytes !== undefined) {
this._metricsHandlers.datalogBytes.set(metrics.datalogBytes);
} }
if (metrics.hardLimitRatio !== undefined) { if (metrics.hardLimitRatio !== undefined) {
@ -143,13 +129,9 @@ class MonitorDiskUsage extends BaseTask {
return this._program.lock !== undefined; return this._program.lock !== undefined;
} }
// eslint-disable-next-line class-methods-use-this _getUsage() {
async _getUsage(path) { moduleLogger.debug(`calculating disk usage for ${this._path}`);
moduleLogger.debug(`calculating disk usage for ${path}`); return getFolderSize(this._path);
if (!fs.existsSync(path)) {
throw Error(`failed to calculate usage for non-existent path ${path}`);
}
return getFolderSize(path);
} }
async _expireMetrics(timestamp) { async _expireMetrics(timestamp) {
@ -267,19 +249,16 @@ class MonitorDiskUsage extends BaseTask {
return; return;
} }
let leveldbBytes = null; let size = null;
let datalogBytes = null;
try { try {
leveldbBytes = await this._getUsage(Path.join(this._path, 'leveldb')); size = await this._getUsage();
datalogBytes = await this._getUsage(Path.join(this._path, 'datalog'));
} catch (error) { } catch (error) {
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error }); moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
return; return;
} }
this._pushMetrics({ leveldbBytes, datalogBytes }); this._pushMetrics({ diskUsage: size });
const size = leveldbBytes + datalogBytes;
if (this._hardLimit !== null) { if (this._hardLimit !== null) {
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size }); moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });

View File

@ -1,5 +1,4 @@
const assert = require('assert'); const assert = require('assert');
const fs = require('fs');
const sinon = require('sinon'); const sinon = require('sinon');
const uuid = require('uuid'); const uuid = require('uuid');
const promClient = require('prom-client'); const promClient = require('prom-client');
@ -18,7 +17,6 @@ describe('Test MonitorDiskUsage hard limit', function () {
beforeEach(async () => { beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`; path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(`${path}/datalog`, { recursive: true });
promClient.register.clear(); promClient.register.clear();
task = new MonitorDiskUsage({ warp10: warp10Clients, enableMetrics: true }); task = new MonitorDiskUsage({ warp10: warp10Clients, enableMetrics: true });
await task.setup(); await task.setup();
@ -29,7 +27,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
afterEach(async () => task.join()); afterEach(async () => task.join());
it('should trigger a database lock if above the limit', async () => { it('should trigger a database lock if above the limit', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 }); fillDir(path, { count: 1, size: 100 });
task._hardLimit = 1; task._hardLimit = 1;
const checkSpy = sinon.spy(task, '_checkHardLimit'); const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates'); const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -45,7 +43,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
}); });
it('should trigger a database unlock if below the limit', async () => { it('should trigger a database unlock if below the limit', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 }); fillDir(path, { count: 1, size: 100 });
task._hardLimit = 10240; task._hardLimit = 10240;
const checkSpy = sinon.spy(task, '_checkHardLimit'); const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates'); const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -58,7 +56,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
}); });
it('should not throw when failing to calculate disk usage', async () => { it('should not throw when failing to calculate disk usage', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 }); fillDir(path, { count: 1, size: 100 });
task._hardLimit = 1; task._hardLimit = 1;
sinon.stub(task, '_getUsage').throws(); sinon.stub(task, '_getUsage').throws();
const _task = task.execute(); const _task = task.execute();

View File

@ -1,5 +1,4 @@
const assert = require('assert'); const assert = require('assert');
const fs = require('fs');
const promClient = require('prom-client'); const promClient = require('prom-client');
const uuid = require('uuid'); const uuid = require('uuid');
@ -10,10 +9,9 @@ const { fillDir } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom'); const { assertMetricValue } = require('../../../utils/prom');
class MonitorDiskUsageShim extends MonitorDiskUsage { class MonitorDiskUsageShim extends MonitorDiskUsage {
async _getUsage(path) { async _getUsage() {
const usage = await super._getUsage(path); this.usage = await super._getUsage();
this.usage = (this.usage || 0) + usage; return this.usage;
return usage;
} }
} }
@ -47,7 +45,6 @@ describe('Test MonitorDiskUsage', () => {
beforeEach(async () => { beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`; path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(path);
task = new MonitorDiskUsageShim({ warp10: [], enableMetrics: true }); task = new MonitorDiskUsageShim({ warp10: [], enableMetrics: true });
task._path = path; task._path = path;
task._enabled = true; task._enabled = true;
@ -59,23 +56,12 @@ describe('Test MonitorDiskUsage', () => {
promClient.register.clear(); promClient.register.clear();
}); });
testCases.map(testCase => { testCases.map(testCase =>
it(`should calculate disk usage for ${testCase.count} files of ${testCase.size} bytes each`, it(`should calculate disk usage for ${testCase.count} files of ${testCase.size} bytes each`,
async () => { async () => {
fillDir(`${path}/leveldb`, testCase); fillDir(path, testCase);
fillDir(`${path}/datalog`, testCase);
await task._execute(); await task._execute();
const expectedSingleSize = emptyDirSize + testCase.expected + (emptyFileSize * testCase.count); assert.strictEqual(task.usage, testCase.expected + emptyDirSize + (emptyFileSize * testCase.count));
const expectedTotalSize = expectedSingleSize * 2; await assertMetricValue('utapi_monitor_disk_usage_bytes', task.usage);
assert.strictEqual(task.usage, expectedTotalSize); }));
// Should equal the usage minus the empty datalog
await assertMetricValue('utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
await assertMetricValue('utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
});
});
it('should fail if a subpath does not exists', () => {
assert.doesNotThrow(() => task._execute());
assert.strictEqual(task.usage, undefined);
});
}); });

View File

@ -10,7 +10,6 @@ const templateExpected = opts => ({
operationId: 'healthcheck', operationId: 'healthcheck',
tag: 'internal', tag: 'internal',
encrypted: false, encrypted: false,
requestTimer: null,
...(opts || {}), ...(opts || {}),
}); });

View File

@ -1,11 +1,8 @@
const assert = require('assert'); const assert = require('assert');
const sinon = require('sinon'); const sinon = require('sinon');
const promClient = require('prom-client');
const { middleware } = require('../../../../libV2/server/middleware'); const { middleware } = require('../../../../libV2/server/middleware');
const { templateRequest, ExpressResponseStub } = require('../../../utils/v2Data'); const { templateRequest, ExpressResponseStub } = require('../../../utils/v2Data');
const RequestContext = require('../../../../libV2/models/RequestContext');
const { getMetricValues, assertMetricValue } = require('../../../utils/prom');
describe('Test middleware', () => { describe('Test middleware', () => {
it('should build a request logger', next => { it('should build a request logger', next => {
@ -89,55 +86,4 @@ describe('Test middleware', () => {
})); }));
}); });
}); });
describe('test httpMetricsMiddleware', () => {
let resp;
beforeEach(() => {
resp = new ExpressResponseStub();
resp.status(200);
});
afterEach(() => {
promClient.register.clear();
});
it('should increment the counter if not an internal route', async () => {
const req = templateRequest({
swagger: {
operation: {
'x-router-controller': 'metrics',
'operationId': 'listMetrics',
},
},
});
req.ctx = new RequestContext(req);
middleware.httpMetricsMiddleware(req, resp);
await assertMetricValue('utapi_http_requests_total', 1);
const duration = await getMetricValues('utapi_http_request_duration_seconds');
// 14 defined buckets + 1 for Infinity
assert.strictEqual(
duration.filter(metric => metric.metricName === 'utapi_http_request_duration_seconds_bucket').length,
15,
);
const count = duration.filter(metric => metric.metricName === 'utapi_http_request_duration_seconds_count');
assert.deepStrictEqual(count, [{
labels: {
action: 'listMetrics',
code: 200,
},
metricName: 'utapi_http_request_duration_seconds_count',
value: 1,
}]);
assert.strictEqual(count[0].value, 1);
});
it('should not increment the counter if an internal route', async () => {
const req = templateRequest();
req.ctx = new RequestContext(req);
middleware.httpMetricsMiddleware(req, resp);
assert.rejects(() => getMetricValues('utapi_http_requests_total'));
});
});
}); });

View File

@ -115,16 +115,7 @@
%> FOREACH %> FOREACH
%> %>
<% <%
// If no new events were found DROP 0 STOP
// - drop the empty list
// - write a new master checkpoint
// - return 0
DROP
NEWGTS $master_checkpoint_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
0 STOP
%> IFTE %> IFTE
0 'checkpoints' STORE 0 'checkpoints' STORE