Compare commits

...

10 Commits

Author SHA1 Message Date
Bennett Buchanan da847d712e bugfix: ZENKO-1144 Update route and private method 2018-10-24 11:39:38 -07:00
philipyoo effaaa3306 bf: ZENKO-1144 fix ttl of sorted set expires
Changes in this commit:
- Fix TTL Redis#expire from ms to secs
2018-10-24 11:38:16 -07:00
philipyoo 23d6b7a431 bf: ZENKO-1144 add sorted set support StatsModel
partial backport
2018-10-24 11:30:42 -07:00
philipyoo 129a8371b4 fix 2018-10-24 10:52:15 -07:00
philipyoo e595eb8cdc partial backport - add pending counters 2018-10-24 09:35:52 -07:00
philipyoo d8dfe1cd9c partial: test updates 2018-10-23 09:56:36 -07:00
philipyoo a622084e26 bf: zero-fill response for getAllStats 2018-10-22 08:58:31 -07:00
philipyoo ea170d7637 fix: do not crash on empty backbeat stats 2018-10-22 08:49:11 -07:00
philipyoo 310051cda6 StatsModel add normalizeTimestamp 2018-10-22 08:43:56 -07:00
philipyoo d06291cd5f bump 2018-10-21 13:43:17 -07:00
5 changed files with 522 additions and 29 deletions

View File

@ -57,6 +57,38 @@ class RedisClient {
.exec(cb);
}
/**
* increment value of a key by a given amount
* @param {string} key - key holding the value
* @param {number} amount - amount to increase by
* @param {callback} cb - callback
* @return {undefined}
*/
incrby(key, amount, cb) {
return this._client.incrby(key, amount, cb);
}
/**
* decrement value of a key by a given amount
* @param {string} key - key holding the value
* @param {number} amount - amount to increase by
* @param {callback} cb - callback
* @return {undefined}
*/
decrby(key, amount, cb) {
return this._client.decrby(key, amount, cb);
}
/**
* get value stored at key
* @param {string} key - key holding the value
* @param {callback} cb - callback
* @return {undefined}
*/
get(key, cb) {
return this._client.get(key, cb);
}
/**
* increment value of a key by a given amount and set a ttl
* @param {string} key - key holding the value
@ -172,6 +204,16 @@ class RedisClient {
return this._client.zrangebyscore(key, min, max, cb);
}
/**
* get TTL or expiration in seconds
* @param {string} key - name of key
* @param {function} cb - callback
* @return {undefined}
*/
ttl(key, cb) {
return this._client.ttl(key, cb);
}
clear(cb) {
return this._client.flushdb(cb);
}

View File

@ -90,6 +90,30 @@ class StatsClient {
return this._redis.incrbyEx(key, amount, this._expiry, callback);
}
/**
* Increment the given key by the given value.
* @param {String} key - The Redis key to increment
* @param {Number} incr - The value to increment by
* @param {function} [cb] - callback
* @return {undefined}
*/
incrementKey(key, incr, cb) {
const callback = cb || this._noop;
return this._redis.incrby(key, incr, callback);
}
/**
* Decrement the given key by the given value.
* @param {String} key - The Redis key to decrement
* @param {Number} decr - The value to decrement by
* @param {function} [cb] - callback
* @return {undefined}
*/
decrementKey(key, decr, cb) {
const callback = cb || this._noop;
return this._redis.decrby(key, decr, callback);
}
/**
* report/record a request that ended up being a 500 on the server
* @param {string} id - service identifier

View File

@ -1,3 +1,5 @@
const async = require('async');
const StatsClient = require('./StatsClient');
/**
* @class StatsModel
@ -6,6 +8,141 @@ const StatsClient = require('./StatsClient');
* rather than by seconds
*/
class StatsModel extends StatsClient {
/**
* Utility method to convert 2d array rows to columns, and vice versa
* See also: https://docs.ruby-lang.org/en/2.0.0/Array.html#method-i-zip
* @param {array} arrays - 2d array of integers
* @return {array} converted array
*/
_zip(arrays) {
if (arrays.length > 0 && arrays.every(a => Array.isArray(a))) {
return arrays[0].map((_, i) => arrays.map(a => a[i]));
}
return [];
}
/**
* normalize to the nearest interval
* @param {object} d - Date instance
* @return {number} timestamp - normalized to the nearest interval
*/
_normalizeTimestamp(d) {
const m = d.getMinutes();
return d.setMinutes(m - m % (Math.floor(this._interval / 60)), 0, 0);
}
/**
* override the method to get the count as an array of integers separated
* by each interval
* typical input looks like [[null, '1'], [null, '2'], [null, null]...]
* @param {array} arr - each index contains the result of each batch command
* where index 0 signifies the error and index 1 contains the result
* @return {array} array of integers, ordered from most recent interval to
* oldest interval with length of (expiry / interval)
*/
_getCount(arr) {
const size = Math.floor(this._expiry / this._interval);
const array = arr.reduce((store, i) => {
let num = parseInt(i[1], 10);
num = Number.isNaN(num) ? 0 : num;
store.push(num);
return store;
}, []);
if (array.length < size) {
array.push(...Array(size - array.length).fill(0));
}
return array;
}
/**
* wrapper on `getStats` that handles a list of keys
* override the method to reduce the returned 2d array from `_getCount`
* @param {object} log - Werelogs request logger
* @param {array} ids - service identifiers
* @param {callback} cb - callback to call with the err/result
* @return {undefined}
*/
getAllStats(log, ids, cb) {
if (!this._redis) {
return cb(null, {});
}
const size = Math.floor(this._expiry / this._interval);
const statsRes = {
'requests': Array(size).fill(0),
'500s': Array(size).fill(0),
'sampleDuration': this._expiry,
};
const requests = [];
const errors = [];
if (ids.length === 0) {
return cb(null, statsRes);
}
// for now set concurrency to default of 10
return async.eachLimit(ids, 10, (id, done) => {
this.getStats(log, id, (err, res) => {
if (err) {
return done(err);
}
requests.push(res.requests);
errors.push(res['500s']);
return done();
});
}, error => {
if (error) {
log.error('error getting stats', {
error,
method: 'StatsModel.getAllStats',
});
return cb(null, statsRes);
}
statsRes.requests = this._zip(requests).map(arr =>
arr.reduce((acc, i) => acc + i), 0);
statsRes['500s'] = this._zip(errors).map(arr =>
arr.reduce((acc, i) => acc + i), 0);
return cb(null, statsRes);
});
}
/**
* Handles getting a list of global keys.
* @param {array} ids - Service identifiers
* @param {object} log - Werelogs request logger
* @param {function} cb - Callback
* @return {undefined}
*/
getAllGlobalStats(ids, log, cb) {
const reqsKeys = ids.map(key => (['get', key]));
return this._redis.batch(reqsKeys, (err, res) => {
const statsRes = { requests: 0 };
if (err) {
log.error('error getting metrics', {
error: err,
method: 'StatsClient.getAllGlobalStats',
});
return cb(null, statsRes);
}
statsRes.requests = res.reduce((sum, curr) => {
const [cmdErr, val] = curr;
if (cmdErr) {
// Log any individual request errors from the batch request.
log.error('error getting metrics', {
error: cmdErr,
method: 'StatsClient.getAllGlobalStats',
});
}
return sum + (Number.parseInt(val, 10) || 0);
}, 0);
return cb(null, statsRes);
});
}
/**
* normalize date timestamp to the nearest hour
* @param {Date} d - Date instance
@ -24,34 +161,6 @@ class StatsModel extends StatsClient {
return d.setHours(d.getHours() - 1);
}
/**
* normalize to the nearest interval
* @param {object} d - Date instance
* @return {number} timestamp - normalized to the nearest interval
*/
_normalizeTimestamp(d) {
const m = d.getMinutes();
return d.setMinutes(m - m % (Math.floor(this._interval / 60)), 0, 0);
}
/**
* override the method to get the result as an array of integers separated
* by each interval
* typical input looks like [[null, '1'], [null, '2'], [null, null]...]
* @param {array} arr - each index contains the result of each batch command
* where index 0 signifies the error and index 1 contains the result
* @return {array} array of integers, ordered from most recent interval to
* oldest interval
*/
_getCount(arr) {
return arr.reduce((store, i) => {
let num = parseInt(i[1], 10);
num = Number.isNaN(num) ? 0 : num;
store.push(num);
return store;
}, []);
}
/**
* get list of sorted set key timestamps
* @param {number} epoch - epoch time

View File

@ -3,7 +3,7 @@
"engines": {
"node": ">=6.9.5"
},
"version": "7.4.2",
"version": "7.4.2-bump",
"description": "Common utilities for the S3 project components",
"main": "index.js",
"repository": {

View File

@ -0,0 +1,318 @@
'use strict'; // eslint-disable-line strict
const assert = require('assert');
const async = require('async');
const RedisClient = require('../../../lib/metrics/RedisClient');
const StatsModel = require('../../../lib/metrics/StatsModel');
// setup redis client
const config = {
host: '127.0.0.1',
port: 6379,
enableOfflineQueue: false,
};
const fakeLogger = {
trace: () => {},
error: () => {},
};
const redisClient = new RedisClient(config, fakeLogger);
// setup stats model
const STATS_INTERVAL = 300; // 5 minutes
const STATS_EXPIRY = 86400; // 24 hours
const statsModel = new StatsModel(redisClient, STATS_INTERVAL, STATS_EXPIRY);
function setExpectedStats(expected) {
return expected.concat(
Array((STATS_EXPIRY / STATS_INTERVAL) - expected.length).fill(0));
}
// Since many methods were overwritten, these tests should validate the changes
// made to the original methods
describe('StatsModel class', () => {
const id = 'arsenal-test';
const id2 = 'test-2';
const id3 = 'test-3';
afterEach(() => redisClient.clear(() => {}));
it('should convert a 2d array columns into rows and vice versa using _zip',
() => {
const arrays = [
[1, 2, 3],
[4, 5, 6],
[7, 8, 9],
];
const res = statsModel._zip(arrays);
const expected = [
[1, 4, 7],
[2, 5, 8],
[3, 6, 9],
];
assert.deepStrictEqual(res, expected);
});
it('_zip should return an empty array if given an invalid array', () => {
const arrays = [];
const res = statsModel._zip(arrays);
assert.deepStrictEqual(res, []);
});
it('_getCount should return a an array of all valid integer values',
() => {
const res = statsModel._getCount([
[null, '1'],
[null, '2'],
[null, null],
]);
assert.deepStrictEqual(res, setExpectedStats([1, 2, 0]));
});
it('should correctly record a new request by default one increment',
done => {
async.series([
next => {
statsModel.reportNewRequest(id, (err, res) => {
assert.ifError(err);
const expected = [[null, 1], [null, 1]];
assert.deepStrictEqual(res, expected);
next();
});
},
next => {
statsModel.reportNewRequest(id, (err, res) => {
assert.ifError(err);
const expected = [[null, 2], [null, 1]];
assert.deepStrictEqual(res, expected);
next();
});
},
], done);
});
it('should record new requests by defined amount increments', done => {
function noop() {}
async.series([
next => {
statsModel.reportNewRequest(id, 9);
statsModel.getStats(fakeLogger, id, (err, res) => {
assert.ifError(err);
assert.deepStrictEqual(res.requests, setExpectedStats([9]));
next();
});
},
next => {
statsModel.reportNewRequest(id);
statsModel.getStats(fakeLogger, id, (err, res) => {
assert.ifError(err);
assert.deepStrictEqual(res.requests,
setExpectedStats([10]));
next();
});
},
next => {
statsModel.reportNewRequest(id, noop);
statsModel.getStats(fakeLogger, id, (err, res) => {
assert.ifError(err);
assert.deepStrictEqual(res.requests,
setExpectedStats([11]));
next();
});
},
], done);
});
it('should correctly record a 500 on the server', done => {
statsModel.report500(id, (err, res) => {
assert.ifError(err);
const expected = [[null, 1], [null, 1]];
assert.deepStrictEqual(res, expected);
done();
});
});
it('should respond back with total requests as an array', done => {
async.series([
next => {
statsModel.reportNewRequest(id, err => {
assert.ifError(err);
next();
});
},
next => {
statsModel.report500(id, err => {
assert.ifError(err);
next();
});
},
next => {
statsModel.getStats(fakeLogger, id, (err, res) => {
assert.ifError(err);
const expected = {
'requests': setExpectedStats([1]),
'500s': setExpectedStats([1]),
'sampleDuration': STATS_EXPIRY,
};
assert.deepStrictEqual(res, expected);
next();
});
},
], done);
});
it('should not crash on empty results', done => {
async.series([
next => {
statsModel.getStats(fakeLogger, id, (err, res) => {
assert.ifError(err);
const expected = {
'requests': setExpectedStats([]),
'500s': setExpectedStats([]),
'sampleDuration': STATS_EXPIRY,
};
assert.deepStrictEqual(res, expected);
next();
});
},
next => {
statsModel.getAllStats(fakeLogger, id, (err, res) => {
assert.ifError(err);
const expected = {
'requests': setExpectedStats([]),
'500s': setExpectedStats([]),
'sampleDuration': STATS_EXPIRY,
};
assert.deepStrictEqual(res, expected);
next();
});
},
], done);
});
it('should return a zero-filled array if no ids are passed to getAllStats',
done => {
statsModel.getAllStats(fakeLogger, [], (err, res) => {
assert.ifError(err);
assert.deepStrictEqual(res.requests, setExpectedStats([]));
assert.deepStrictEqual(res['500s'], setExpectedStats([]));
done();
});
});
it('should get accurately reported data for given id from getAllStats',
done => {
statsModel.reportNewRequest(id, 9);
statsModel.reportNewRequest(id2, 2);
statsModel.reportNewRequest(id3, 3);
statsModel.report500(id);
async.series([
next => {
statsModel.getAllStats(fakeLogger, [id], (err, res) => {
assert.ifError(err);
assert.equal(res.requests[0], 9);
assert.equal(res['500s'][0], 1);
next();
});
},
next => {
statsModel.getAllStats(fakeLogger, [id, id2, id3],
(err, res) => {
assert.ifError(err);
assert.equal(res.requests[0], 14);
assert.deepStrictEqual(res.requests,
setExpectedStats([14]));
next();
});
},
], done);
});
it('should normalize to the nearest hour using normalizeTimestampByHour',
() => {
const date = new Date('2018-09-13T23:30:59.195Z');
const newDate = new Date(statsModel.normalizeTimestampByHour(date));
assert.strictEqual(date.getHours(), newDate.getHours());
assert.strictEqual(newDate.getMinutes(), 0);
assert.strictEqual(newDate.getSeconds(), 0);
assert.strictEqual(newDate.getMilliseconds(), 0);
});
it('should get previous hour using _getDatePreviousHour', () => {
const date = new Date('2018-09-13T23:30:59.195Z');
const newDate = statsModel._getDatePreviousHour(new Date(date));
const millisecondsInOneHour = 3600000;
assert.strictEqual(date - newDate, millisecondsInOneHour);
});
it('should get an array of hourly timestamps using getSortedSetHours',
() => {
const epoch = 1536882476501;
const millisecondsInOneHour = 3600000;
const expected = [];
let dateInMilliseconds = statsModel.normalizeTimestampByHour(
new Date(epoch));
for (let i = 0; i < 24; i++) {
expected.push(dateInMilliseconds);
dateInMilliseconds -= millisecondsInOneHour;
}
const res = statsModel.getSortedSetHours(epoch);
assert.deepStrictEqual(res, expected);
});
it('should apply TTL on a new sorted set using addToSortedSet', done => {
const key = 'a-test-key';
const score = 100;
const value = 'a-value';
const now = Date.now();
const nearestHour = statsModel.normalizeTimestampByHour(new Date(now));
statsModel.addToSortedSet(key, score, value, (err, res) => {
assert.ifError(err);
// check both a "zadd" and "expire" occurred
assert.equal(res, 1);
redisClient.ttl(key, (err, res) => {
assert.ifError(err);
// assert this new set has a ttl applied
assert(res > 0);
const adjustmentSecs = now - nearestHour;
const msInADay = 24 * 60 * 60 * 1000;
const msInAnHour = 60 * 60 * 1000;
const upperLimitSecs =
Math.ceil((msInADay - adjustmentSecs) / 1000);
const lowerLimitSecs =
Math.floor((msInADay - adjustmentSecs - msInAnHour) / 1000);
// assert new ttl is between 23 and 24 hours adjusted by time
// elapsed since normalized hourly time
assert(res >= lowerLimitSecs);
assert(res <= upperLimitSecs);
done();
});
});
});
});