Compare commits
1 Commits
developmen
...
ft/S3C-121
Author | SHA1 | Date |
---|---|---|
Rahul Padigela | b166d90ee0 |
|
@ -15,5 +15,7 @@
|
|||
"vaultd": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 8500
|
||||
}
|
||||
},
|
||||
"expireMetrics": false,
|
||||
"expireMetricsTTL": 0
|
||||
}
|
||||
|
|
|
@ -164,7 +164,11 @@ class Config {
|
|||
throw new Error('bad config: both certFilePaths.key and ' +
|
||||
'certFilePaths.cert must be defined');
|
||||
}
|
||||
|
||||
if (config.expireMetrics !== undefined) {
|
||||
assert(typeof config.expireMetrics === 'boolean', 'bad config: ' +
|
||||
'expireMetrics must be a boolean');
|
||||
this.expireMetrics = config.expireMetrics === true;
|
||||
}
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -161,7 +161,7 @@ class Datastore {
|
|||
}
|
||||
|
||||
/**
|
||||
* execute a pipeline of commands
|
||||
* execute a batch of commands
|
||||
* @param {string[]} cmds - list of commands
|
||||
* @param {callback} cb - callback
|
||||
* @return {undefined}
|
||||
|
@ -170,6 +170,29 @@ class Datastore {
|
|||
return this._client.pipeline(cmds).exec(cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* execute a list of commands as transaction
|
||||
* @param {string[]} cmds - list of commands
|
||||
* @param {callback} cb - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
multi(cmds, cb) {
|
||||
return this._client.multi(cmds).exec((err, res) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const flattenRes = [];
|
||||
const resErr = res.find(item => {
|
||||
flattenRes.push(item[1]);
|
||||
return item[0] !== null;
|
||||
});
|
||||
if (resErr) {
|
||||
return cb(resErr);
|
||||
}
|
||||
return cb(null, flattenRes);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* remove elements from the sorted set that fall between the min and max
|
||||
* scores
|
||||
|
@ -236,6 +259,10 @@ class Datastore {
|
|||
publish(channel, message, cb) {
|
||||
return this._client.publish(channel, message, cb);
|
||||
}
|
||||
|
||||
scan(cursor, pattern, cb) {
|
||||
return this._client.scan(cursor, 'match', pattern, cb);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Datastore;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
const assert = require('assert');
|
||||
const { doUntil, parallel } = require('async');
|
||||
const werelogs = require('werelogs');
|
||||
const Datastore = require('./Datastore');
|
||||
const { generateKey, generateCounter, generateStateKey } = require('./schema');
|
||||
|
@ -7,7 +8,7 @@ const redisClient = require('../utils/redisClient');
|
|||
|
||||
const methods = {
|
||||
createBucket: { method: '_genericPushMetric', changesData: true },
|
||||
deleteBucket: { method: '_genericPushMetric', changesData: true },
|
||||
deleteBucket: { method: '_pushMetricDeleteBucket', changesData: true },
|
||||
listBucket: { method: '_genericPushMetric', changesData: false },
|
||||
getBucketAcl: { method: '_genericPushMetric', changesData: false },
|
||||
putBucketAcl: { method: '_genericPushMetric', changesData: true },
|
||||
|
@ -75,6 +76,8 @@ class UtapiClient {
|
|||
* types to push metrics for
|
||||
* @param {array} [config.component] - The component from which the metrics
|
||||
* are being pushed (e.g., 's3')
|
||||
* @param {boolean} [config.expireMetrics] - Boolean to expire metrics
|
||||
* when buckets are deleted.
|
||||
*/
|
||||
constructor(config) {
|
||||
const api = (config || {}).logApi || werelogs;
|
||||
|
@ -107,6 +110,8 @@ class UtapiClient {
|
|||
this.service = config.component;
|
||||
}
|
||||
this.disableClient = false;
|
||||
this.expireMetrics = config.expireMetrics;
|
||||
this.expireTTL = config.expireTTL || 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -381,6 +386,73 @@ class UtapiClient {
|
|||
});
|
||||
}
|
||||
|
||||
_pushMetricDeleteBucket(params, timestamp, action, log, callback) {
|
||||
this._genericPushMetric(params, timestamp, action, log, err => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (!this.expireMetrics) {
|
||||
return callback();
|
||||
}
|
||||
const { bucket } = params;
|
||||
const statelessKeysGlob = `s3:buckets:*:${bucket}:*`;
|
||||
const statefulKeysGlob = `s3:buckets:${bucket}:*`;
|
||||
return parallel([
|
||||
done => this._scanKeys(statelessKeysGlob, log, done),
|
||||
done => this._scanKeys(statefulKeysGlob, log, done),
|
||||
], (err, keys) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
return this._expireMetrics([].concat(...keys), log, callback);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
_scanKeys(pattern, log, callback) {
|
||||
let cursor = '0';
|
||||
const keys = [];
|
||||
doUntil(
|
||||
done => this.ds.scan(cursor, pattern, (err, res) => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
cursor = res[0];
|
||||
// builds an array like [['expire', <key>, <ttl>], ...]
|
||||
// keys.push(...res[1].map(val => ['expire', val]));
|
||||
keys.push(...res[1]);
|
||||
return done();
|
||||
}),
|
||||
// if cursor is 0, it reached end of scan
|
||||
() => cursor === '0',
|
||||
err => callback(err, keys)
|
||||
);
|
||||
}
|
||||
// builds an array like [['expire', <key>, <ttl>], ...]
|
||||
// keys.push(...res[1].map(val => ['expire', val]));
|
||||
|
||||
_expireMetrics(keys, log, callback) {
|
||||
// expire metrics here
|
||||
const expireCmds = keys.map(k => ['expire', k, this.expireTTL]);
|
||||
return this.ds.multi(expireCmds, (err, result) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
// each delete command gets a score 1 if it's a success,
|
||||
// should match the total commands sent for deletion
|
||||
const allKeysDeleted =
|
||||
keys.length === result.reduce((a, v) => a + v, 0);
|
||||
if (!allKeysDeleted) {
|
||||
log.debug('error expiring keys', { delResult: result });
|
||||
return callback(
|
||||
errors.InternalError.customizeDescription(
|
||||
'error expiring some keys')
|
||||
);
|
||||
}
|
||||
return callback();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates counter for the putDeleteMarkerObject action
|
||||
* @param {object} params - params for the metrics
|
||||
|
|
Loading…
Reference in New Issue