Compare commits
1 Commits
developmen
...
ft/S3C-121
Author | SHA1 | Date |
---|---|---|
Rahul Padigela | b166d90ee0 |
|
@ -15,5 +15,7 @@
|
||||||
"vaultd": {
|
"vaultd": {
|
||||||
"host": "127.0.0.1",
|
"host": "127.0.0.1",
|
||||||
"port": 8500
|
"port": 8500
|
||||||
}
|
},
|
||||||
|
"expireMetrics": false,
|
||||||
|
"expireMetricsTTL": 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,7 +164,11 @@ class Config {
|
||||||
throw new Error('bad config: both certFilePaths.key and ' +
|
throw new Error('bad config: both certFilePaths.key and ' +
|
||||||
'certFilePaths.cert must be defined');
|
'certFilePaths.cert must be defined');
|
||||||
}
|
}
|
||||||
|
if (config.expireMetrics !== undefined) {
|
||||||
|
assert(typeof config.expireMetrics === 'boolean', 'bad config: ' +
|
||||||
|
'expireMetrics must be a boolean');
|
||||||
|
this.expireMetrics = config.expireMetrics === true;
|
||||||
|
}
|
||||||
return config;
|
return config;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -161,7 +161,7 @@ class Datastore {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* execute a pipeline of commands
|
* execute a batch of commands
|
||||||
* @param {string[]} cmds - list of commands
|
* @param {string[]} cmds - list of commands
|
||||||
* @param {callback} cb - callback
|
* @param {callback} cb - callback
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
|
@ -170,6 +170,29 @@ class Datastore {
|
||||||
return this._client.pipeline(cmds).exec(cb);
|
return this._client.pipeline(cmds).exec(cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* execute a list of commands as transaction
|
||||||
|
* @param {string[]} cmds - list of commands
|
||||||
|
* @param {callback} cb - callback
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
multi(cmds, cb) {
|
||||||
|
return this._client.multi(cmds).exec((err, res) => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
const flattenRes = [];
|
||||||
|
const resErr = res.find(item => {
|
||||||
|
flattenRes.push(item[1]);
|
||||||
|
return item[0] !== null;
|
||||||
|
});
|
||||||
|
if (resErr) {
|
||||||
|
return cb(resErr);
|
||||||
|
}
|
||||||
|
return cb(null, flattenRes);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* remove elements from the sorted set that fall between the min and max
|
* remove elements from the sorted set that fall between the min and max
|
||||||
* scores
|
* scores
|
||||||
|
@ -236,6 +259,10 @@ class Datastore {
|
||||||
publish(channel, message, cb) {
|
publish(channel, message, cb) {
|
||||||
return this._client.publish(channel, message, cb);
|
return this._client.publish(channel, message, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
scan(cursor, pattern, cb) {
|
||||||
|
return this._client.scan(cursor, 'match', pattern, cb);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = Datastore;
|
module.exports = Datastore;
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
|
const { doUntil, parallel } = require('async');
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
const Datastore = require('./Datastore');
|
const Datastore = require('./Datastore');
|
||||||
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
||||||
|
@ -7,7 +8,7 @@ const redisClient = require('../utils/redisClient');
|
||||||
|
|
||||||
const methods = {
|
const methods = {
|
||||||
createBucket: { method: '_genericPushMetric', changesData: true },
|
createBucket: { method: '_genericPushMetric', changesData: true },
|
||||||
deleteBucket: { method: '_genericPushMetric', changesData: true },
|
deleteBucket: { method: '_pushMetricDeleteBucket', changesData: true },
|
||||||
listBucket: { method: '_genericPushMetric', changesData: false },
|
listBucket: { method: '_genericPushMetric', changesData: false },
|
||||||
getBucketAcl: { method: '_genericPushMetric', changesData: false },
|
getBucketAcl: { method: '_genericPushMetric', changesData: false },
|
||||||
putBucketAcl: { method: '_genericPushMetric', changesData: true },
|
putBucketAcl: { method: '_genericPushMetric', changesData: true },
|
||||||
|
@ -75,6 +76,8 @@ class UtapiClient {
|
||||||
* types to push metrics for
|
* types to push metrics for
|
||||||
* @param {array} [config.component] - The component from which the metrics
|
* @param {array} [config.component] - The component from which the metrics
|
||||||
* are being pushed (e.g., 's3')
|
* are being pushed (e.g., 's3')
|
||||||
|
* @param {boolean} [config.expireMetrics] - Boolean to expire metrics
|
||||||
|
* when buckets are deleted.
|
||||||
*/
|
*/
|
||||||
constructor(config) {
|
constructor(config) {
|
||||||
const api = (config || {}).logApi || werelogs;
|
const api = (config || {}).logApi || werelogs;
|
||||||
|
@ -107,6 +110,8 @@ class UtapiClient {
|
||||||
this.service = config.component;
|
this.service = config.component;
|
||||||
}
|
}
|
||||||
this.disableClient = false;
|
this.disableClient = false;
|
||||||
|
this.expireMetrics = config.expireMetrics;
|
||||||
|
this.expireTTL = config.expireTTL || 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,6 +386,73 @@ class UtapiClient {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_pushMetricDeleteBucket(params, timestamp, action, log, callback) {
|
||||||
|
this._genericPushMetric(params, timestamp, action, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
if (!this.expireMetrics) {
|
||||||
|
return callback();
|
||||||
|
}
|
||||||
|
const { bucket } = params;
|
||||||
|
const statelessKeysGlob = `s3:buckets:*:${bucket}:*`;
|
||||||
|
const statefulKeysGlob = `s3:buckets:${bucket}:*`;
|
||||||
|
return parallel([
|
||||||
|
done => this._scanKeys(statelessKeysGlob, log, done),
|
||||||
|
done => this._scanKeys(statefulKeysGlob, log, done),
|
||||||
|
], (err, keys) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
return this._expireMetrics([].concat(...keys), log, callback);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_scanKeys(pattern, log, callback) {
|
||||||
|
let cursor = '0';
|
||||||
|
const keys = [];
|
||||||
|
doUntil(
|
||||||
|
done => this.ds.scan(cursor, pattern, (err, res) => {
|
||||||
|
if (err) {
|
||||||
|
return done(err);
|
||||||
|
}
|
||||||
|
cursor = res[0];
|
||||||
|
// builds an array like [['expire', <key>, <ttl>], ...]
|
||||||
|
// keys.push(...res[1].map(val => ['expire', val]));
|
||||||
|
keys.push(...res[1]);
|
||||||
|
return done();
|
||||||
|
}),
|
||||||
|
// if cursor is 0, it reached end of scan
|
||||||
|
() => cursor === '0',
|
||||||
|
err => callback(err, keys)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
// builds an array like [['expire', <key>, <ttl>], ...]
|
||||||
|
// keys.push(...res[1].map(val => ['expire', val]));
|
||||||
|
|
||||||
|
_expireMetrics(keys, log, callback) {
|
||||||
|
// expire metrics here
|
||||||
|
const expireCmds = keys.map(k => ['expire', k, this.expireTTL]);
|
||||||
|
return this.ds.multi(expireCmds, (err, result) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
// each delete command gets a score 1 if it's a success,
|
||||||
|
// should match the total commands sent for deletion
|
||||||
|
const allKeysDeleted =
|
||||||
|
keys.length === result.reduce((a, v) => a + v, 0);
|
||||||
|
if (!allKeysDeleted) {
|
||||||
|
log.debug('error expiring keys', { delResult: result });
|
||||||
|
return callback(
|
||||||
|
errors.InternalError.customizeDescription(
|
||||||
|
'error expiring some keys')
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return callback();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates counter for the putDeleteMarkerObject action
|
* Updates counter for the putDeleteMarkerObject action
|
||||||
* @param {object} params - params for the metrics
|
* @param {object} params - params for the metrics
|
||||||
|
|
Loading…
Reference in New Issue