Compare commits

...

1 Commits

Author SHA1 Message Date
Rahul Padigela b166d90ee0 ft: expire bucket metrics on delete bucket 2018-06-01 14:13:34 -07:00
5 changed files with 110 additions and 5 deletions

View File

@ -15,5 +15,7 @@
"vaultd": { "vaultd": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 8500 "port": 8500
} },
"expireMetrics": false,
"expireMetricsTTL": 0
} }

View File

@ -164,7 +164,11 @@ class Config {
throw new Error('bad config: both certFilePaths.key and ' + throw new Error('bad config: both certFilePaths.key and ' +
'certFilePaths.cert must be defined'); 'certFilePaths.cert must be defined');
} }
if (config.expireMetrics !== undefined) {
assert(typeof config.expireMetrics === 'boolean', 'bad config: ' +
'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics === true;
}
return config; return config;
} }
} }

View File

@ -161,7 +161,7 @@ class Datastore {
} }
/** /**
* execute a pipeline of commands * execute a batch of commands
* @param {string[]} cmds - list of commands * @param {string[]} cmds - list of commands
* @param {callback} cb - callback * @param {callback} cb - callback
* @return {undefined} * @return {undefined}
@ -170,6 +170,29 @@ class Datastore {
return this._client.pipeline(cmds).exec(cb); return this._client.pipeline(cmds).exec(cb);
} }
/**
* execute a list of commands as transaction
* @param {string[]} cmds - list of commands
* @param {callback} cb - callback
* @return {undefined}
*/
multi(cmds, cb) {
return this._client.multi(cmds).exec((err, res) => {
if (err) {
return cb(err);
}
const flattenRes = [];
const resErr = res.find(item => {
flattenRes.push(item[1]);
return item[0] !== null;
});
if (resErr) {
return cb(resErr);
}
return cb(null, flattenRes);
});
}
/** /**
* remove elements from the sorted set that fall between the min and max * remove elements from the sorted set that fall between the min and max
* scores * scores
@ -236,6 +259,10 @@ class Datastore {
publish(channel, message, cb) { publish(channel, message, cb) {
return this._client.publish(channel, message, cb); return this._client.publish(channel, message, cb);
} }
scan(cursor, pattern, cb) {
return this._client.scan(cursor, 'match', pattern, cb);
}
} }
module.exports = Datastore; module.exports = Datastore;

View File

@ -1,4 +1,5 @@
const assert = require('assert'); const assert = require('assert');
const { doUntil, parallel } = require('async');
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const { generateKey, generateCounter, generateStateKey } = require('./schema'); const { generateKey, generateCounter, generateStateKey } = require('./schema');
@ -7,7 +8,7 @@ const redisClient = require('../utils/redisClient');
const methods = { const methods = {
createBucket: { method: '_genericPushMetric', changesData: true }, createBucket: { method: '_genericPushMetric', changesData: true },
deleteBucket: { method: '_genericPushMetric', changesData: true }, deleteBucket: { method: '_pushMetricDeleteBucket', changesData: true },
listBucket: { method: '_genericPushMetric', changesData: false }, listBucket: { method: '_genericPushMetric', changesData: false },
getBucketAcl: { method: '_genericPushMetric', changesData: false }, getBucketAcl: { method: '_genericPushMetric', changesData: false },
putBucketAcl: { method: '_genericPushMetric', changesData: true }, putBucketAcl: { method: '_genericPushMetric', changesData: true },
@ -75,6 +76,8 @@ class UtapiClient {
* types to push metrics for * types to push metrics for
* @param {array} [config.component] - The component from which the metrics * @param {array} [config.component] - The component from which the metrics
* are being pushed (e.g., 's3') * are being pushed (e.g., 's3')
* @param {boolean} [config.expireMetrics] - Boolean to expire metrics
* when buckets are deleted.
*/ */
constructor(config) { constructor(config) {
const api = (config || {}).logApi || werelogs; const api = (config || {}).logApi || werelogs;
@ -107,6 +110,8 @@ class UtapiClient {
this.service = config.component; this.service = config.component;
} }
this.disableClient = false; this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireTTL = config.expireTTL || 0;
} }
} }
@ -381,6 +386,73 @@ class UtapiClient {
}); });
} }
_pushMetricDeleteBucket(params, timestamp, action, log, callback) {
this._genericPushMetric(params, timestamp, action, log, err => {
if (err) {
return callback(err);
}
if (!this.expireMetrics) {
return callback();
}
const { bucket } = params;
const statelessKeysGlob = `s3:buckets:*:${bucket}:*`;
const statefulKeysGlob = `s3:buckets:${bucket}:*`;
return parallel([
done => this._scanKeys(statelessKeysGlob, log, done),
done => this._scanKeys(statefulKeysGlob, log, done),
], (err, keys) => {
if (err) {
return callback(err);
}
return this._expireMetrics([].concat(...keys), log, callback);
});
});
}
_scanKeys(pattern, log, callback) {
let cursor = '0';
const keys = [];
doUntil(
done => this.ds.scan(cursor, pattern, (err, res) => {
if (err) {
return done(err);
}
cursor = res[0];
// builds an array like [['expire', <key>, <ttl>], ...]
// keys.push(...res[1].map(val => ['expire', val]));
keys.push(...res[1]);
return done();
}),
// if cursor is 0, it reached end of scan
() => cursor === '0',
err => callback(err, keys)
);
}
// builds an array like [['expire', <key>, <ttl>], ...]
// keys.push(...res[1].map(val => ['expire', val]));
_expireMetrics(keys, log, callback) {
// expire metrics here
const expireCmds = keys.map(k => ['expire', k, this.expireTTL]);
return this.ds.multi(expireCmds, (err, result) => {
if (err) {
return callback(err);
}
// each delete command gets a score 1 if it's a success,
// should match the total commands sent for deletion
const allKeysDeleted =
keys.length === result.reduce((a, v) => a + v, 0);
if (!allKeysDeleted) {
log.debug('error expiring keys', { delResult: result });
return callback(
errors.InternalError.customizeDescription(
'error expiring some keys')
);
}
return callback();
});
}
/** /**
* Updates counter for the putDeleteMarkerObject action * Updates counter for the putDeleteMarkerObject action
* @param {object} params - params for the metrics * @param {object} params - params for the metrics