Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Chan f290d0542c version bump test 2018-06-06 19:25:17 -07:00
Alexander Chan 000b903217 ft: ZENKO-433 add item count support incremental update and refresh 2018-06-06 18:08:37 -07:00
Alexander Chan d2e1fd3a57 rf: use mongodb aggregate method for item count 2018-06-06 18:06:18 -07:00
4 changed files with 916 additions and 234 deletions

View File

@ -0,0 +1,256 @@
const NEW_OBJ = 0;
const NEW_VER = 1;
const UPDATE_VER = 2;
const UPDATE_MST = 3;
const RESTORE = 4;
const DEL_VER = 0;
const DEL_MST = 1;
const CURR = 'curr';
const PREV = 'prev';
function deepCopyObject(obj) {
return JSON.parse(JSON.stringify(obj));
}
class DataCounter {
/**
* DataCounter - class for keeping track of the ItemCount metrics
* @return {DataCounter} DataCounter object
*/
constructor() {
this.objects = 0;
this.versions = 0;
this.buckets = 0;
this.bucketList = [];
this.dataManaged = {
total: { curr: 0, prev: 0 },
byLocation: {},
};
this.busy = false;
this.populated = false;
}
/**
* set - set DataCounter values
* @param {Object} setVal - object containing values to be used for setting
* DataCounter
* @param {number} setVal.objects - number of master objects
* @param {number} setVal.versions - number of versioned objects
* @param {number} setVal.buckets - number of buckets
* @param {string[]} setVal.bucketList - list of bucket names
* @param {Object} setVal.dataManaged - object containing information about
* all the data managed
* @param {Object} setVal.total - object containing the total byte count of
* data managed
* @param {number} setVal.total.curr - the total byte count of master
* objects
* @param {number} setVal.total.prev - the total byte count of versioned
* objects
* @param {Object} setVal.byLocaton - object containing the information
* about data managed on each location
* @return {undefined}
*/
set(setVal) {
if (setVal) {
this.objects = setVal.objects;
this.versions = setVal.versions;
this.buckets = setVal.buckets;
this.bucketList = [...setVal.bucketList];
this.dataManaged = deepCopyObject(setVal.dataManaged);
this.busy = false;
this.populated = true;
}
}
/**
* results - creates a deep copy of the current DataCounter values
* @return {Object} - object containing the current DataCounter values
*/
results() {
const obj = {
objects: this.objects,
versions: this.versions,
buckets: this.buckets,
bucketList: this.bucketList,
dataManaged: this.dataManaged,
};
return deepCopyObject(obj);
}
/**
* addObjectFn - performing add operations
* @param {ObjectMD} currMD - new master version metadata
* @param {ObjectMD} prevMD - old master version metadata
* @param {number} type - index of the current type of add operation
* @return {undefined}
*/
addObject(currMD, prevMD, type) {
if (type !== undefined && type !== null && this.populated) {
switch (type) {
case NEW_OBJ: // add new object, replace master if needed
if (prevMD) {
this._delValue(prevMD, CURR);
this._addValue(currMD, CURR);
} else {
++this.objects;
this._addValue(currMD, CURR);
}
break;
case NEW_VER: // add new object, archive master
++this.versions;
this._delValue(prevMD, CURR);
this._addValue(prevMD, PREV);
this._addValue(currMD, CURR);
break;
case UPDATE_VER: // update archived object, replication info
this._updateObject(currMD, prevMD, PREV);
break;
case UPDATE_MST: // update master object, replication info
this._updateObject(currMD, prevMD, CURR);
break;
case RESTORE:
--this.versions;
this._delValue(currMD, PREV);
++this.objects;
this._addValue(currMD, CURR);
break;
default:
// should throw error, noop
break;
}
}
}
/**
* delObjectFn - performing del operations
* @param {ObjectMD} currMD - object metadata
* @param {number} type - index of the current type of delete operation
* @return {undefined}
*/
delObject(currMD, type) {
if (type !== undefined && type !== null && this.populated) {
switch (type) {
case DEL_VER:
--this.versions;
this._delValue(currMD, PREV);
break;
case DEL_MST:
--this.objects;
this._delValue(currMD, CURR);
break;
default:
// should throw error, noop
break;
}
}
}
_addLocation(site, size, type) {
this.dataManaged.total[type] += size;
if (!this.dataManaged.byLocation[site]) {
this.dataManaged.byLocation[site] = {
curr: 0,
prev: 0,
};
}
this.dataManaged.byLocation[site][type] += size;
}
/**
* _addValue - helper function for handling put object updates
* @param {ObjectMD} objMD - object metadata
* @param {string} type - string with value either 'curr' or 'prev'
* @return {undefined}
*/
_addValue(objMD, type) {
if (objMD) {
const { replicationInfo, 'content-length': size } = objMD;
const { backends } = replicationInfo || {};
this._addLocation(objMD.dataStoreName, size, type);
if (backends && Array.isArray(backends)) {
backends.forEach(loc => {
const { site, status } = loc;
if (status === 'COMPLETED') {
this._addLocation(site, size, type);
}
});
}
}
}
/**
* _updateObject - helper function for handling updates from replication
* info changes
* @param {ObjectMD} currMD - new object metadata
* @param {ObjectMD} prevMD - old object metadata
* @param {string} type - string with value either 'curr' or 'prev'
* @return {undefined}
*/
_updateObject(currMD, prevMD, type) {
if (currMD && prevMD) {
// check for changes in replication
const { replicationInfo: currLocs,
'content-length': size } = currMD;
const { replicationInfo: prevLocs } = prevMD;
const { backends: prevBackends } = prevLocs || {};
const { backends: currBackends } = currLocs || {};
const oldLocs = {};
if (prevBackends && Array.isArray(prevBackends)) {
prevBackends.forEach(loc => {
const { site, status } = loc;
oldLocs[site] = status;
});
}
if (currBackends && Array.isArray(currBackends)) {
currBackends.forEach(loc => {
const { site, status } = loc;
if (site in oldLocs && status === 'COMPLETED' &&
oldLocs[site] !== status) {
this._addLocation(site, size, type);
}
});
}
}
}
_delLocation(site, size, type) {
if (this.dataManaged.byLocation[site]) {
this.dataManaged.total[type] -= size;
this.dataManaged.byLocation[site][type] -= size;
}
}
/**
* _delValue - helper function for handling delete object operations
* @param {ObjectMD} objMD - object metadata
* @param {string} type - string with value either 'curr' or 'prev'
* @return {undefined}
*/
_delValue(objMD, type) {
if (objMD) {
const { replicationInfo, 'content-length': size } = objMD;
const { backends } = replicationInfo || {};
this._delLocation(objMD.dataStoreName, size, type);
if (backends && Array.isArray(backends)) {
backends.forEach(loc => {
const { site, status } = loc;
if (status === 'COMPLETED') {
this._delLocation(site, size, type);
}
});
}
}
}
}
module.exports = {
NEW_OBJ,
NEW_VER,
UPDATE_VER,
UPDATE_MST,
RESTORE,
DEL_VER,
DEL_MST,
DataCounter,
};

View File

@ -25,6 +25,16 @@ const listAlgos = require('../../../algos/list/exportAlgos');
const MongoReadStream = require('./readStream'); const MongoReadStream = require('./readStream');
const MongoUtils = require('./utils'); const MongoUtils = require('./utils');
const {
NEW_OBJ,
NEW_VER,
UPDATE_VER,
UPDATE_MST,
RESTORE,
DEL_VER,
DEL_MST,
DataCounter,
} = require('./DataCounter');
const USERSBUCKET = '__usersbucket'; const USERSBUCKET = '__usersbucket';
const METASTORE = '__metastore'; const METASTORE = '__metastore';
@ -65,6 +75,13 @@ function generatePHDVersion(versionId) {
}; };
} }
function isUserBucket(bucketName) {
return bucketName !== METASTORE &&
bucketName !== INFOSTORE &&
bucketName !== USERSBUCKET &&
bucketName !== PENSIEVE &&
!bucketName.startsWith(constants.mpuBucketPrefix);
}
/** /**
* @constructor * @constructor
@ -94,7 +111,7 @@ class MongoClientInterface {
this.replicationGroupId = replicationGroupId; this.replicationGroupId = replicationGroupId;
this.database = database; this.database = database;
this.lastItemScanTime = null; this.lastItemScanTime = null;
this.lastItemScanResult = null; this.dataCount = new DataCounter();
} }
setup(cb) { setup(cb) {
@ -477,21 +494,107 @@ class MongoClientInterface {
}); });
} }
_getPrevMD(c, params, isUpdate, log, cb) {
const collectionName = c.s.name;
const { objName, versionId } = params;
if (isUserBucket(collectionName)) {
if (isUpdate) {
return c.find({
'value.versionId': versionId,
}, {}).toArray((err, entries) => {
if (err) {
log.error(err.message, {
error: err,
});
}
return cb(null, entries);
});
}
return c.findOne({
_id: objName,
}, {}, (err, preVal) => {
if (err) {
log.error(err.message, {
error: err,
});
return cb();
}
return cb(null, preVal);
});
}
return cb();
}
putObject(bucketName, objName, objVal, params, log, cb) { putObject(bucketName, objName, objVal, params, log, cb) {
MongoUtils.serialize(objVal); MongoUtils.serialize(objVal);
const c = this.getCollection(bucketName); const c = this.getCollection(bucketName);
if (params && params.versioning && !params.versionId) { const countParams = {
return this.putObjectVerCase1(c, bucketName, objName, objVal, objName,
params, log, cb); versionId: params.versionId,
} else if (params && params.versionId === '') { };
return this.putObjectVerCase2(c, bucketName, objName, objVal, const isUpdate = params && params.versionId;
params, log, cb); const isVer = params && ((params.versioning && !params.versionId) ||
} else if (params && params.versionId) { params.versionId);
return this.putObjectVerCase3(c, bucketName, objName, objVal,
params, log, cb); async.series({
} prevMD: done =>
return this.putObjectNoVer(c, bucketName, objName, objVal, this._getPrevMD(c, countParams, isUpdate, log, done),
params, log, cb); putObjectRes: done => {
if (params && params.versioning && !params.versionId) {
return this.putObjectVerCase1(
c, bucketName, objName, objVal, params, log, done);
} else if (params && params.versionId === '') {
return this.putObjectVerCase2(
c, bucketName, objName, objVal, params, log, done);
} else if (params && params.versionId) {
return this.putObjectVerCase3(
c, bucketName, objName, objVal, params, log, done);
}
return this.putObjectNoVer(
c, bucketName, objName, objVal, params, log, done);
},
}, (err, result) => {
if (err) {
return cb(err);
}
const { prevMD, putObjectRes } = result;
if (isUpdate) {
switch (prevMD.length) {
case 2:
// length == 2 => master entry + version entry
// updating master object MD
this.dataCount.addObject(
objVal, prevMD[0].value, UPDATE_MST);
break;
case 1:
// length == 1 => version entry
// updating version object MD
this.dataCount.addObject(
objVal, prevMD[0].value, UPDATE_VER);
break;
case 0:
// changing null id to versioning compatible id
// no data count changes
break;
default:
// something seems wrong if length is not 0, 1, or 2
log.debug('invalid number of object entries');
break;
}
} else {
if (!prevMD || !prevMD.value) {
this.dataCount.addObject(
objVal, null, NEW_OBJ);
} else if (isVer) {
this.dataCount.addObject(
objVal, prevMD.value, NEW_VER);
} else {
this.dataCount.addObject(
objVal, prevMD.value, NEW_OBJ);
}
}
return cb(null, putObjectRes);
});
} }
getObject(bucketName, objName, params, log, cb) { getObject(bucketName, objName, params, log, cb) {
@ -585,6 +688,9 @@ class MongoClientInterface {
{ error: err.message }); { error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
if (isUserBucket(c.s.name)) {
this.dataCount.addObject(objVal, null, RESTORE);
}
return cb(null); return cb(null);
}); });
} }
@ -742,10 +848,26 @@ class MongoClientInterface {
if (mst.value.isPHD || if (mst.value.isPHD ||
mst.value.versionId === params.versionId) { mst.value.versionId === params.versionId) {
return this.deleteObjectVerMaster(c, bucketName, objName, return this.deleteObjectVerMaster(c, bucketName, objName,
params, log, cb); params, log, err => {
if (err) {
return cb(err);
}
if (isUserBucket(c.s.name)) {
this.dataCount.delObject(mst.value, DEL_MST);
}
return cb();
});
} }
return this.deleteObjectVerNotMaster(c, bucketName, objName, return this.deleteObjectVerNotMaster(c, bucketName, objName,
params, log, cb); params, log, err => {
if (err) {
return cb(err);
}
if (isUserBucket(c.s.name)) {
this.dataCount.delObject(mst.value, DEL_VER);
}
return cb();
});
}); });
} }
@ -768,6 +890,10 @@ class MongoClientInterface {
{ error: err.message }); { error: err.message });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
const objMD = result.value || {};
if (isUserBucket(c.s.name)) {
this.dataCount.delObject(objMD.value, DEL_MST);
}
return cb(null); return cb(null);
}); });
} }
@ -894,10 +1020,10 @@ class MongoClientInterface {
} }
countItems(log, cb) { countItems(log, cb) {
if (this.lastItemScanTime !== null && // if (this.lastItemScanTime !== null &&
(Date.now() - this.lastItemScanTime) <= itemScanRefreshDelay) { // (Date.now() - this.lastItemScanTime) <= itemScanRefreshDelay) {
return process.nextTick(cb, null, this.lastItemScanResult); // return process.nextTick(cb, null, this.lastItemScanResult);
} // }
const res = { const res = {
objects: 0, objects: 0,
@ -965,12 +1091,11 @@ class MongoClientInterface {
if (err) { if (err) {
return next(errors.InternalError); return next(errors.InternalError);
} }
if (results.bucketList && results.counter && if (results.bucketList && results.dataManaged) {
results.counter.dataManaged) {
res.bucketList.push(results.bucketList); res.bucketList.push(results.bucketList);
res.objects += results.counter.objects; res.objects += results.objects;
res.versions += results.counter.versions; res.versions += results.versions;
consolidateData(results.counter.dataManaged); consolidateData(results.dataManaged);
} }
return next(); return next();
}); });
@ -979,173 +1104,62 @@ class MongoClientInterface {
return cb(err); return cb(err);
} }
this.lastItemScanTime = Date.now(); this.lastItemScanTime = Date.now();
this.lastItemScanResult = res; this.dataCount.set(res);
return cb(null, res); return cb(null, this.dataCount.results());
}); });
}); });
return undefined; return undefined;
} }
getDataManaged(c, query, log, cb) { _handleResults(res, isVersioned) {
const dataManaged = { const total = { curr: 0, prev: 0 };
total: 0, const locations = {};
locations: {},
};
c.find(query).toArray((err, entries) => {
if (err) {
log.error('error occured in mongo client', {
method: 'getDataManaged',
error: err,
});
return cb(err);
}
entries.forEach(entry => {
const { backends } = entry.value.replicationInfo;
const objMD = entry.value;
dataManaged.total += objMD['content-length'];
if (!dataManaged.locations[objMD.dataStoreName]) {
dataManaged.locations[objMD.dataStoreName] =
objMD['content-length'];
} else {
dataManaged.locations[objMD.dataStoreName] +=
objMD['content-length'];
}
// replication list
backends.forEach(location => {
const { site } = location;
if (!dataManaged.locations[site]) {
dataManaged.locations[site] = objMD['content-length'];
} else {
dataManaged.locations[site] += objMD['content-length'];
}
});
});
return cb(null, {
objCount: entries.length,
dataManaged,
});
});
}
getDataInfoNoVer(c, log, callback) { Object.keys(res.nullData).forEach(loc => {
return this.getDataManaged(c, {}, log, (err, res) => { const bytes = res.nullData[loc];
if (err) { const locName = loc === 'mem' || loc === 'file' ?
log.error('error occured in mongo client', { 'us-east-1' : loc;
method: 'getDataInfoNoVer', if (!locations[locName]) {
error: err, locations[locName] = { curr: 0, prev: 0 };
});
return callback(err);
} }
const dataManaged = { total.curr += bytes;
total: { curr: 0, prev: 0 }, locations[locName].curr += bytes;
locations: {},
};
const data = res.dataManaged.locations;
// add total
dataManaged.total.curr += res.dataManaged.total;
Object.keys(data).forEach(loc => {
if (!dataManaged.locations[loc]) {
dataManaged.locations[loc] = { prev: 0, curr: 0 };
}
dataManaged.locations[loc].curr = data[loc];
});
return callback(null, {
objects: res.objCount,
versions: 0,
dataManaged,
});
}); });
} if (isVersioned) {
Object.keys(res.versionData).forEach(loc => {
getDataInfoVer(c, mstVerIds, log, callback) { const bytes = res.versionData[loc];
// query for removing placeholder entries const locName = loc === 'mem' || loc === 'file' ?
/* eslint-disable quote-props */ 'us-east-1' : loc;
const queryFilter = { if (!locations[locName]) {
'$or': [ locations[locName] = { curr: 0, prev: 0 };
{ '_id': { '$regex': /\0.*$/g } }, }
{ 'value.isNull': { '$exists': true } }, total.prev += bytes;
{ locations[locName].prev += bytes;
'value.nullVersionId': { '$exists': false }, });
'value.versionId': { '$exists': false }, }
}, Object.keys(res.masterData).forEach(loc => {
], const bytes = res.masterData[loc];
}; const locName = loc === 'mem' || loc === 'file' ?
/* eslint-enable quote-props */ 'us-east-1' : loc;
return async.series({ if (!locations[locName]) {
master: done => { locations[locName] = { curr: 0, prev: 0 };
// query for getting only master version entries }
const mstFilter = []; total.curr += bytes;
/* eslint-disable quote-props */ locations[locName].curr += bytes;
mstVerIds.forEach(id => { if (isVersioned) {
if (id) { total.prev -= bytes;
mstFilter.push({ 'value.versionId': { '$eq': id } }); locations[locName].prev -= bytes;
} }
}); });
const query = { return {
'$and': [ versions: isVersioned ?
queryFilter, res.versionCount - res.masterCount : 0,
{ '$or': (mstFilter.length ? mstFilter : [{}]) }, objects: res.masterCount + res.nullCount,
], dataManaged: {
}; total,
/* eslint-enable quote-props */ locations,
return this.getDataManaged(c, query, log, done);
}, },
archived: done => { };
// query for getting only versioned entries
const mstFilter = [];
/* eslint-disable quote-props */
mstVerIds.forEach(id => {
if (id) {
mstFilter.push({ 'value.versionId': { '$ne': id } });
}
});
const query = {
'$and': [
queryFilter,
{ '$and': (mstFilter.length ? mstFilter : [{}]) },
],
};
/* eslint-enable quote-props */
return this.getDataManaged(c, query, log, done);
},
}, (err, res) => {
if (err) {
log.error('error occured in mongo client', {
method: 'getDataInfoVer',
error: err,
});
return callback(err);
}
const dataManaged = {
total: { curr: 0, prev: 0 },
locations: {},
};
const mstData = res.master.dataManaged.locations;
const verData = res.archived.dataManaged.locations;
// add total
dataManaged.total.curr += res.master.dataManaged.total;
dataManaged.total.prev += res.archived.dataManaged.total;
Object.keys(mstData).forEach(loc => {
if (!dataManaged.locations[loc]) {
dataManaged.locations[loc] = { prev: 0, curr: 0 };
}
dataManaged.locations[loc].curr = mstData[loc];
});
Object.keys(verData).forEach(loc => {
if (!dataManaged.locations[loc]) {
dataManaged.locations[loc] = { prev: 0, curr: 0 };
}
dataManaged.locations[loc].prev = verData[loc];
});
return callback(null, {
objects: res.master.objCount,
versions: res.archived.objCount,
dataManaged,
});
});
} }
getObjectMDStats(bucketName, bucketInfo, log, callback) { getObjectMDStats(bucketName, bucketInfo, log, callback) {
@ -1154,60 +1168,109 @@ class MongoClientInterface {
name: bucketName, name: bucketName,
location: bucketInfo.getLocationConstraint(), location: bucketInfo.getLocationConstraint(),
}; };
if (bucketInfo && bucketInfo._versioningConfiguration &&
(bucketInfo._versioningConfiguration.Status === 'Suspended' || const mstFilter = {
bucketInfo._versioningConfiguration.Status === 'Enabled')) { '_id': { $regex: /^[^\0]+$/ },
// if versioning is enabled 'value.versionId': { $exists: true },
c.distinct('_id').then(keys => { };
const trimmedKeys = keys.map(key => key.replace(/\0.*$/g, '')); const verFilter = { _id: { $regex: /\0/ } };
const uniqKeys = trimmedKeys.filter( const nullFilter = {
(key, index, self) => self.indexOf(key) === index); '_id': { $regex: /^[^\0]+$/ },
// for each uniqKey get master version id 'value.versionId': { $exists: false },
return async.map(uniqKeys, (key, done) => { };
this.getLatestVersion(c, key, log, (err, mst) => { const grpObj = {
if (err) { _id: '$value.dataStoreName',
if (err.NoSuchKey) { bytes: { $sum: '$value.content-length' },
log.debug('NoSuchKey master info', { };
method: 'getObjectMDStats', const grpRepObj = {
error: err, _id: '$value.replicationInfo.backends.site',
}); bytes: { $sum: '$value.content-length' },
return done(); };
}
log.error('unable to retrieve master info', { const _handleCount = (err, entries, cb) => {
method: 'getObjectMDStats', if (err) {
error: err, return cb(err);
}); }
return done(err); return cb(null,
} entries && entries.length > 0 ? entries.length : 0);
return done(null, mst.versionId); };
}); const _handleEntries = (err, entries, cb) => {
}, (err, mstVerIds) => { if (err) {
if (err) { return cb(err);
return callback(err); }
} const results = {};
return this.getDataInfoVer(c, mstVerIds, log, if (entries) {
(err, res) => { entries.forEach(entry => {
if (err) { results[entry._id] = entry.bytes;
return callback(err);
}
return callback(null, {
bucketList: retBucketInfo,
counter: res,
});
});
}); });
}).catch(callback); }
} else { return cb(null, results);
this.getDataInfoNoVer(c, log, (err, res) => { };
if (err) {
return callback(err); const _handleMongo = (cursor, cb) => async.parallel({
count: done => {
const tmpCursor = cursor.clone();
tmpCursor.toArray(
(err, res) => _handleCount(err, res, done));
},
data: done => {
const tmpCursor = cursor.clone();
tmpCursor.group(grpObj)
.toArray((err, res) => _handleEntries(err, res, done));
},
repData: done => {
const tmpCursor = cursor.clone();
tmpCursor.unwind('$value.replicationInfo.backends')
.group(grpRepObj)
.toArray((err, res) => _handleEntries(err, res, done));
},
}, (err, res) => {
if (err) {
log.error('Error when processing mongo entries', {
method: '_handleMongo',
error: err,
});
return cb(err);
}
const retResult = {
count: res.count,
data: Object.assign({}, res.data),
};
Object.keys(res.repData).forEach(site => {
if (!retResult.data[site]) {
retResult.data[site] = 0;
} }
return callback(null, { retResult.data[site] += res.repData[site];
bucketList: retBucketInfo,
counter: res,
});
}); });
} return cb(null, retResult);
});
const mstCursor = c.aggregate().match(mstFilter);
const verCursor = c.aggregate().match(verFilter);
const nullCursor = c.aggregate().match(nullFilter);
async.parallel({
version: done => _handleMongo(verCursor, done),
null: done => _handleMongo(nullCursor, done),
master: done => _handleMongo(mstCursor, done),
}, (err, res) => {
if (err) {
return callback(err);
}
const resObj = {
masterCount: res.master.count || 0,
masterData: res.master.data || {},
nullCount: res.null.count || 0,
nullData: res.null.data || {},
versionCount: res.version.count || 0,
versionData: res.version.data || {},
};
const bucketStatus = bucketInfo.getVersioningConfiguration();
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
bucketStatus.Status === 'Suspended'));
const retResult = this._handleResults(resObj, isVer);
retResult.bucketList = retBucketInfo;
return callback(null, retResult);
});
} }
} }

View File

@ -3,7 +3,7 @@
"engines": { "engines": {
"node": ">=6.9.5" "node": ">=6.9.5"
}, },
"version": "8.0.0", "version": "8.0.0-test-1.0",
"description": "Common utilities for the S3 project components", "description": "Common utilities for the S3 project components",
"main": "index.js", "main": "index.js",
"repository": { "repository": {

View File

@ -0,0 +1,363 @@
const assert = require('assert');
const {
NEW_OBJ,
NEW_VER,
UPDATE_VER,
UPDATE_MST,
RESTORE,
DEL_VER,
DEL_MST,
DataCounter,
} = require('../../../../../lib/storage/metadata/mongoclient/DataCounter');
const refZeroObj = {
objects: 0,
versions: 0,
buckets: 0,
bucketList: [],
dataManaged: {
total: { curr: 0, prev: 0 },
byLocation: {},
},
};
const refSingleObj = {
objects: 2,
versions: 0,
buckets: 0,
bucketList: [],
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
},
},
};
const refSingleObjVer = {
objects: 1,
versions: 1,
buckets: 0,
bucketList: [],
dataManaged: {
total: { curr: 100, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
},
},
};
const refMultiObjVer = {
objects: 1,
versions: 1,
buckets: 0,
bucketList: [],
dataManaged: {
total: { curr: 200, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
locationTwo: { curr: 100, prev: 100 },
},
},
};
const refMultiObj = {
objects: 2,
versions: 0,
buckets: 0,
bucketList: [],
dataManaged: {
total: { curr: 400, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
};
// eslint-disable-next-line quote-props
const singleSite = size => ({
'content-length': size,
dataStoreName: 'locationOne',
replicationInfo: {
backends: [],
},
});
// eslint-disable-next-line quote-props
const multiSite = (size, isComplete) => ({
'content-length': size,
dataStoreName: 'locationOne',
replicationInfo: {
backends: [{
site: 'locationTwo',
status: isComplete ? 'COMPLETED' : 'PENDING',
}],
},
});
const dataCounter = new DataCounter();
describe('DataCounter Class', () => {
it('should create a zero object', () => {
dataCounter.set(refZeroObj);
assert.deepStrictEqual(dataCounter.results(), refZeroObj);
});
it('should skip dataCounter methods if initial values are not set', () => {
const testCounter = new DataCounter();
testCounter.addObject(singleSite(100), null, NEW_OBJ);
assert.deepStrictEqual(testCounter.results(), refZeroObj);
});
});
describe('DataCounter::addObject', () => {
const tests = [
{
it: 'should correctly update DataCounter, new object one site',
init: refZeroObj,
input: [singleSite(100), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 100, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, new object multi site',
init: refZeroObj,
input: [multiSite(100, true), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, overwrite single site',
init: refSingleObj,
input: [singleSite(100), singleSite(50), NEW_OBJ],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 250, prev: 0 },
byLocation: {
locationOne: { curr: 250, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, overwrite multi site',
init: refMultiObj,
input: [multiSite(100, true), multiSite(50, true), NEW_OBJ],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 500, prev: 0 },
byLocation: {
locationOne: { curr: 250, prev: 0 },
locationTwo: { curr: 250, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, new version single site',
init: refSingleObj,
input: [singleSite(100), singleSite(50), NEW_VER],
expectedRes: {
objects: 2, versions: 1,
dataManaged: {
total: { curr: 250, prev: 50 },
byLocation: {
locationOne: { curr: 250, prev: 50 },
},
},
},
},
{
it: 'should correctly update DataCounter, new version multi site',
init: refMultiObj,
input: [multiSite(100, true), multiSite(50, true), NEW_VER],
expectedRes: {
objects: 2, versions: 1,
dataManaged: {
total: { curr: 500, prev: 100 },
byLocation: {
locationOne: { curr: 250, prev: 50 },
locationTwo: { curr: 250, prev: 50 },
},
},
},
},
{
it: 'should correctly ignore pending status, multi site',
init: refZeroObj,
input: [multiSite(100, false), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 100, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'replication completion update in master object',
init: refSingleObj,
input: [multiSite(100, true), multiSite(100, false), UPDATE_MST],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 300, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'replication completion update in versioned object',
init: refSingleObjVer,
input: [multiSite(100, true), multiSite(100, false), UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
locationTwo: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'restoring versioned object as master',
init: refMultiObjVer,
input: [multiSite(100, true), multiSite(100, true), RESTORE],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 400, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
},
},
];
tests.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.addObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});
describe('DataCounter::delObject', () => {
const tests = [
{
it: 'should correctly update DataCounter, ' +
'delete master object single site',
init: refMultiObj,
input: [singleSite(100), DEL_MST],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 300, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete master object multi site',
init: refMultiObj,
input: [multiSite(100, true), DEL_MST],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete versioned object single site',
init: refMultiObjVer,
input: [singleSite(100), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete versioned object multi site',
init: refMultiObjVer,
input: [multiSite(100, true), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
];
tests.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.delObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});