Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Chan 8588d21512 tmp 2020-01-02 22:20:54 +00:00
5 changed files with 150 additions and 1261 deletions

View File

@ -1,274 +0,0 @@
const NEW_OBJ = 0;
const NEW_VER = 1;
const UPDATE_VER = 2;
const UPDATE_MST = 3;
const RESTORE = 4;
const DEL_VER = 0;
const DEL_MST = 1;
const CURR = 'curr';
const PREV = 'prev';
function deepCopyObject(obj) {
return JSON.parse(JSON.stringify(obj));
}
class DataCounter {
/**
* DataCounter - class for keeping track of the ItemCount metrics
* @return {DataCounter} DataCounter object
*/
constructor() {
this.objects = 0;
this.versions = 0;
this.dataManaged = {
total: { curr: 0, prev: 0 },
byLocation: {},
};
this.stalled = 0;
this.populated = false;
this.transientList = {};
}
/**
* updateTransientList - update data counter list of transient locations
* @param {Object} newLocations - list of locations constraint details
* @return {undefined}
*/
updateTransientList(newLocations) {
if (newLocations && Object.keys(newLocations).length > 0) {
const tempList = {};
Object.keys(newLocations).forEach(loc => {
tempList[loc] = newLocations[loc].isTransient;
});
this.transientList = tempList;
}
}
/**
* set - set DataCounter values
* @param {Object} setVal - object containing values to be used for setting
* DataCounter
* @param {number} setVal.objects - number of master objects
* @param {number} setVal.versions - number of versioned objects
* @param {Object} setVal.dataManaged - object containing information about
* all the data managed
* @param {Object} setVal.total - object containing the total byte count of
* data managed
* @param {number} setVal.total.curr - the total byte count of master
* objects
* @param {number} setVal.total.prev - the total byte count of versioned
* objects
* @param {Object} setVal.byLocaton - object containing the information
* about data managed on each location
* @return {undefined}
*/
set(setVal) {
if (setVal) {
this.objects = setVal.objects;
this.versions = setVal.versions;
this.dataManaged = deepCopyObject(setVal.dataManaged);
this.populated = true;
this.stalled = setVal.stalled;
}
}
/**
* results - creates a deep copy of the current DataCounter values
* @return {Object} - object containing the current DataCounter values
*/
results() {
const obj = {
objects: this.objects,
versions: this.versions,
dataManaged: this.dataManaged,
stalled: this.stalled,
};
return deepCopyObject(obj);
}
/**
* addObjectFn - performing add operations
* @param {ObjectMD} currMD - new master version metadata
* @param {ObjectMD} prevMD - old master version metadata
* @param {number} type - index of the current type of add operation
* @return {undefined}
*/
addObject(currMD, prevMD, type) {
if (type !== undefined && type !== null && this.populated) {
switch (type) {
case NEW_OBJ: // add new object, replace master if needed
if (prevMD) {
this._delValue(prevMD, CURR);
this._addValue(currMD, CURR);
} else {
++this.objects;
this._addValue(currMD, CURR);
}
break;
case NEW_VER: // add new object, archive master
++this.versions;
this._delValue(prevMD, CURR);
this._addValue(prevMD, PREV);
this._addValue(currMD, CURR);
break;
case UPDATE_VER: // update archived object, replication info
this._updateObject(currMD, prevMD, PREV);
break;
case UPDATE_MST: // update master object, replication info
this._updateObject(currMD, prevMD, CURR);
break;
case RESTORE:
--this.versions;
this._delValue(currMD, PREV);
++this.objects;
this._addValue(currMD, CURR);
break;
default:
// should throw error, noop
break;
}
}
}
/**
* delObjectFn - performing del operations
* @param {ObjectMD} currMD - object metadata
* @param {number} type - index of the current type of delete operation
* @return {undefined}
*/
delObject(currMD, type) {
if (type !== undefined && type !== null && this.populated) {
switch (type) {
case DEL_VER:
--this.versions;
this._delValue(currMD, PREV);
break;
case DEL_MST:
--this.objects;
this._delValue(currMD, CURR);
break;
default:
// should throw error, noop
break;
}
}
}
_addLocation(site, size, type) {
this.dataManaged.total[type] += size;
if (!this.dataManaged.byLocation[site]) {
this.dataManaged.byLocation[site] = {
curr: 0,
prev: 0,
};
}
this.dataManaged.byLocation[site][type] += size;
}
/**
* _addValue - helper function for handling put object updates
* @param {ObjectMD} objMD - object metadata
* @param {string} type - string with value either 'curr' or 'prev'
* @return {undefined}
*/
_addValue(objMD, type) {
if (objMD) {
const { replicationInfo, 'content-length': size } = objMD;
const { backends } = replicationInfo || {};
this._addLocation(objMD.dataStoreName, size, type);
if (backends && Array.isArray(backends)) {
backends.forEach(loc => {
const { site, status } = loc;
if (status === 'COMPLETED') {
this._addLocation(site, size, type);
}
});
}
}
}
/**
* _updateObject - helper function for handling updates from replication
* info changes
* @param {ObjectMD} currMD - new object metadata
* @param {ObjectMD} prevMD - old object metadata
* @param {string} type - string with value either 'curr' or 'prev'
* @return {undefined}
*/
_updateObject(currMD, prevMD, type) {
const transientList = Object.assign({}, this.transientList);
if (currMD && prevMD) {
// check for changes in replication
const { replicationInfo: currLocs,
'content-length': size, dataStoreName } = currMD;
const { replicationInfo: prevLocs } = prevMD;
const { backends: prevBackends } = prevLocs || {};
const { backends: currBackends } = currLocs || {};
const oldLocs = {};
if (prevBackends && Array.isArray(prevBackends)) {
prevBackends.forEach(loc => {
const { site, status } = loc;
oldLocs[site] = status;
});
}
if (currBackends && Array.isArray(currBackends)) {
currBackends.forEach(loc => {
const { site, status } = loc;
if (site in oldLocs && status === 'COMPLETED' &&
oldLocs[site] !== status) {
this._addLocation(site, size, type);
}
});
}
if (currLocs.status === 'COMPLETED' &&
transientList[dataStoreName]) {
this._delLocation(dataStoreName, size, type);
}
}
}
_delLocation(site, size, type) {
if (this.dataManaged.byLocation[site]) {
this.dataManaged.total[type] -= size;
this.dataManaged.total[type] =
Math.max(0, this.dataManaged.total[type]);
this.dataManaged.byLocation[site][type] -= size;
this.dataManaged.byLocation[site][type] =
Math.max(0, this.dataManaged.byLocation[site][type]);
}
}
/**
* _delValue - helper function for handling delete object operations
* @param {ObjectMD} objMD - object metadata
* @param {string} type - string with value either 'curr' or 'prev'
* @return {undefined}
*/
_delValue(objMD, type) {
if (objMD) {
const { replicationInfo, 'content-length': size } = objMD;
const { backends } = replicationInfo || {};
this._delLocation(objMD.dataStoreName, size, type);
if (backends && Array.isArray(backends)) {
backends.forEach(loc => {
const { site, status } = loc;
if (status === 'COMPLETED') {
this._delLocation(site, size, type);
}
});
}
}
}
}
module.exports = {
NEW_OBJ,
NEW_VER,
UPDATE_VER,
UPDATE_MST,
RESTORE,
DEL_VER,
DEL_MST,
DataCounter,
};

View File

@ -27,7 +27,6 @@ const listAlgos = require('../../../algos/list/exportAlgos');
const MongoReadStream = require('./readStream');
const MongoUtils = require('./utils');
const { DataCounter } = require('./DataCounter');
const Skip = require('../../../algos/list/skip');
const USERSBUCKET = '__usersbucket';
@ -40,7 +39,7 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
const CONNECT_TIMEOUT_MS = 5000;
// MongoDB default
const SOCKET_TIMEOUT_MS = 360000;
const NUM_PARALLEL_COUNT_PROC = 1;
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
let uidCounter = 0;
@ -92,7 +91,7 @@ function generatePHDVersion(versionId) {
class MongoClientInterface {
constructor(params) {
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
database, logger, replicationGroupId, authCredentials, config,
database, logger, replicationGroupId, authCredentials,
isLocationTransient } = params;
const cred = MongoUtils.credPrefix(authCredentials);
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
@ -104,20 +103,7 @@ class MongoClientInterface {
this.path = path;
this.replicationGroupId = replicationGroupId;
this.database = database;
this.dataCount = new DataCounter();
this.isLocationTransient = isLocationTransient;
if (config && config instanceof EventEmitter) {
this.config = config;
this.config.on('location-constraints-update', () => {
this.dataCount
.updateTransientList(this.config.locationConstraints);
if (this.config.isTest) {
this.config.emit('MongoClientTestDone');
}
});
this.dataCount.updateTransientList(this.config.locationConstraints);
}
}
setup(cb) {
@ -1283,43 +1269,51 @@ class MongoClientInterface {
store.buckets = bucketCount;
store.bucketList = retBucketInfos;
return async.eachLimit(bucketInfos, 10, (bucketInfo, done) => {
async.waterfall([
next => this._getIsTransient(bucketInfo, log, next),
(isTransient, next) => {
const bucketName = bucketInfo.getName();
this.getObjectMDStats(bucketName, bucketInfo,
isTransient, log, next);
},
], (err, results) => {
let concurrentCounts = NUM_PARALLEL_COUNT_PROC;
if (process.env.NUM_PARALLEL_COUNT_PROC &&
!Number.isNaN(process.env.NUM_PARALLEL_COUNT_PROC)) {
concurrentCounts = Number
.parseInt(process.env.NUM_PARALLEL_COUNT_PROC, 1);
}
return async.eachLimit(bucketInfos, concurrentCounts,
(bucketInfo, done) => {
async.waterfall([
next => this._getIsTransient(bucketInfo, log, next),
(isTransient, next) => {
const bucketName = bucketInfo.getName();
this.getObjectMDStats(bucketName, bucketInfo,
isTransient, log, next);
},
], (err, results) => {
if (err) {
return done(err);
}
if (results.dataManaged) {
store.objects += results.objects;
store.versions += results.versions;
store.stalled += results.stalled;
consolidateData(results.dataManaged);
}
return done();
});
}, err => {
if (err) {
return done(err);
}
if (results.dataManaged) {
store.objects += results.objects;
store.versions += results.versions;
store.stalled += results.stalled;
consolidateData(results.dataManaged);
}
return done();
});
}, err => {
if (err) {
return cb(err);
}
// save to infostore
return this.updateCountItems(store, log, err => {
if (err) {
log.error('error saving count items in mongo', {
method: 'scanItemCount',
error: err,
});
return cb(err);
}
return cb(null, store);
// save to infostore
return this.updateCountItems(store, log, err => {
if (err) {
log.error('error saving count items in mongo', {
method: 'scanItemCount',
error: err,
});
return cb(err);
}
return cb(null, store);
});
});
});
});
return undefined;
}
@ -1429,37 +1423,28 @@ class MongoClientInterface {
return results;
}
_handleMongo(c, filter, isTransient, log, cb) {
const reducedFields = {
'_id': 1,
'value.versionId': 1,
'value.replicationInfo.status': 1,
'value.replicationInfo.backends': 1,
'value.content-length': 1,
'value.dataStoreName': 1,
};
_handleData(data, repData) {
const retResult = this._handleEntries(data);
const repDataEntries = this._handleEntries(repData);
Object.keys(repDataEntries).forEach(site => {
if (!retResult[site]) {
retResult[site] = 0;
}
retResult[site] += repDataEntries[site];
});
return retResult;
}
const aggCount = [
{ $project: { _id: 1 } },
{ $group: { _id: null, count: { $sum: 1 } } },
_createCountPipeline(filter) {
return [
{ $match: filter },
{ $count: 'count' },
];
}
const aggData = [
{ $project: {
'value.dataStoreName': 1,
'value.content-length': 1,
} },
{ $group: {
_id: '$value.dataStoreName',
bytes: { $sum: '$value.content-length' },
} },
];
const aggRepData = [
{ $project: {
'value.replicationInfo.backends': 1,
'value.content-length': 1,
} },
_createRepDataPipeline(filter) {
return [
{ $match: filter },
{ $unwind: '$value.replicationInfo.backends' },
{ $match: {
'value.replicationInfo.backends.status': { $eq: 'COMPLETED' },
@ -1469,155 +1454,119 @@ class MongoClientInterface {
bytes: { $sum: '$value.content-length' },
} },
];
}
const aggCompleted = [
{ $project: {
'value.dataStoreName': 1,
'value.content-length': 1,
'inComplete': {
$eq: ['$value.replicationInfo.status', 'COMPLETED'],
},
} },
{ $match: { inComplete: true } },
_createDataPipeline(isTransient, filter) {
const matchStage = !isTransient
? filter
: Object.assign(
{},
filter,
{ 'value.replicationInfo.status': { $ne: 'COMPLETED' } }
);
return [
{ $match: matchStage },
{ $group: {
_id: '$value.dataStoreName',
bytes: { $sum: '$value.content-length' },
} },
];
return c.aggregate([
{ $project: reducedFields },
{ $match: filter },
{ $facet: {
count: aggCount,
data: aggData,
repData: aggRepData,
compData: aggCompleted,
} },
]).toArray((err, res) => {
if (err) {
log.error('Error when processing mongo entries', {
method: '_handleMongo',
error: err,
});
return cb(err);
}
if (!res || res.length < 1) {
log.debug('aggregate returned empty results', {
method: '_handleMongo',
});
return cb(null, {});
}
const agg = res[0];
if (!agg || !agg.count || !agg.data || !agg.repData) {
log.debug('missing field in aggregate results', {
method: '_handleMongo',
});
return cb(null, {});
}
const retResult = {
count: this._handleCount(agg.count[0] || {}),
data: this._handleEntries(agg.data),
};
const repDataEntries = this._handleEntries(agg.repData);
Object.keys(repDataEntries).forEach(site => {
if (!retResult.data[site]) {
retResult.data[site] = 0;
}
retResult.data[site] += repDataEntries[site];
});
if (isTransient && agg.compData) {
const compDataEntries = this._handleEntries(agg.compData);
Object.keys(compDataEntries).forEach(site => {
if (retResult.data[site]) {
retResult.data[site] -= compDataEntries[site];
retResult.data[site] =
Math.max(0, retResult.data[site]);
}
});
}
return cb(null, retResult);
});
}
_getStalled(c, cmpDate, log, cb) {
const reducedFields = {
'_id': {
id: '$_id',
status: '$value.replicationInfo.status',
},
'value.last-modified': 1,
_aggregateCount(c, params, callback) {
const { cmpDate, isTransient } = params;
const mstFilter = {
'isVersioned': { $eq: -1 },
'value.versionId': { $exists: true },
};
const verFilter = { isVersioned: { $ne: -1 } };
const nullFilter = {
'isVersioned': { $eq: -1 },
'value.versionId': { $exists: false },
};
return c.aggregate([
{ $project: reducedFields },
{ $match: {
'_id.id': { $regex: /\0/ },
'_id.status': { $eq: 'PENDING' },
} },
]).toArray((err, res) => {
{
$project: {
'_id': 1,
'value.replicationInfo': 1,
'value.last-modified': {
$dateFromString: { dateString: '$value.last-modified' },
},
'value.dataStoreName': 1,
'value.content-length': 1,
'value.versionId': 1,
'isVersioned': { $indexOfCP: ['$_id', '\0'] },
},
},
{
$facet: {
stalled: [
{ $match: Object.assign(
{},
verFilter,
{
'value.replicationInfo.status': { $in: [
'PENDING',
] },
'value.last-modified': {
$lt: cmpDate,
},
}
) },
{ $count: 'count' },
],
nullCount: this._createCountPipeline(nullFilter),
nullRepData: this._createRepDataPipeline(nullFilter),
nullData: this._createDataPipeline(isTransient, nullFilter),
mstCount: this._createCountPipeline(mstFilter),
mstRepData: this._createRepDataPipeline(mstFilter),
mstData: this._createDataPipeline(isTransient, mstFilter),
verCount: this._createCountPipeline(verFilter),
verRepData: this._createRepDataPipeline(verFilter),
verData: this._createDataPipeline(isTransient, verFilter),
},
},
], { allowDiskUse: true }).toArray((err, res) => {
if (err) {
log.debug('Unable to retrieve stalled entries', {
error: err,
});
return cb(null);
return callback(err);
}
const count = res.filter(data => {
if (!data || typeof data !== 'object' ||
!data.value || typeof data.value !== 'object') {
return false;
}
const time = data.value['last-modified'] || null;
if (isNaN(Date.parse(time))) {
return false;
}
const testDate = new Date(time);
return testDate <= cmpDate;
}).length;
return cb(null, count);
const resObj = {
masterCount: this._handleCount(res[0].mstCount[0]),
masterData: this._handleData(res[0].mstData,
res[0].mstRepData),
nullCount: this._handleCount(res[0].nullCount[0]),
nullData: this._handleData(res[0].nullData,
res[0].nullRepData),
versionCount: this._handleCount(res[0].verCount[0]),
versionData: this._handleData(res[0].verData,
res[0].verRepData),
stalled: this._handleCount(res[0].stalled[0]),
};
return callback(null, resObj);
});
}
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
const c = this.getCollection(bucketName);
const mstFilter = {
'_id': { $regex: /^[^\0]+$/ },
'value.versionId': { $exists: true },
};
const verFilter = { _id: { $regex: /\0/ } };
const nullFilter = {
'_id': { $regex: /^[^\0]+$/ },
'value.versionId': { $exists: false },
};
const cmpDate = new Date();
cmpDate.setHours(cmpDate.getHours() - 1);
async.parallel({
version: done =>
this._handleMongo(c, verFilter, isTransient, log, done),
null: done =>
this._handleMongo(c, nullFilter, isTransient, log, done),
master: done =>
this._handleMongo(c, mstFilter, isTransient, log, done),
stalled: done =>
this._getStalled(c, cmpDate, log, done),
}, (err, res) => {
const params = {
cmpDate,
isTransient,
};
return this._aggregateCount(c, params, (err, res) => {
if (err) {
log.error('Error when perform count items aggregate', {
method: 'getObjectMDStats',
error: err,
});
return callback(err);
}
const resObj = {
masterCount: res.master.count || 0,
masterData: res.master.data || {},
nullCount: res.null.count || 0,
nullData: res.null.data || {},
versionCount: res.version.count || 0,
versionData: res.version.data || {},
};
const bucketStatus = bucketInfo.getVersioningConfiguration();
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
bucketStatus.Status === 'Suspended'));
const retResult = this._handleResults(resObj, isVer);
retResult.stalled = res.stalled || 0;
const retResult = this._handleResults(res, isVer);
retResult.stalled = res.stalled;
return callback(null, retResult);
});
}

View File

@ -1,507 +0,0 @@
const assert = require('assert');
const {
NEW_OBJ,
NEW_VER,
UPDATE_VER,
UPDATE_MST,
RESTORE,
DEL_VER,
DEL_MST,
DataCounter,
} = require('../../../../../lib/storage/metadata/mongoclient/DataCounter');
const refZeroObj = {
objects: 0,
versions: 0,
dataManaged: {
total: { curr: 0, prev: 0 },
byLocation: {},
},
stalled: 0,
};
const refSingleObj = {
objects: 2,
versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
},
},
stalled: 0,
};
const refSingleObjVer = {
objects: 1,
versions: 1,
dataManaged: {
total: { curr: 100, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
},
},
stalled: 0,
};
const refMultiObjVer = {
objects: 1,
versions: 1,
dataManaged: {
total: { curr: 200, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
locationTwo: { curr: 100, prev: 100 },
},
},
stalled: 0,
};
const refMultiObj = {
objects: 2,
versions: 0,
dataManaged: {
total: { curr: 400, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
stalled: 0,
};
const singleSite = size => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': {
backends: [],
},
});
const multiSite = (size, isComplete) => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': {
backends: [{
site: 'locationTwo',
status: isComplete ? 'COMPLETED' : 'PENDING',
}],
},
});
const transientSite = (size, status, backends) => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': { status, backends },
});
const locationConstraints = {
locationOne: { isTransient: true },
locationTwo: { isTransient: false },
};
const dataCounter = new DataCounter();
describe('DataCounter Class', () => {
it('should create a zero object', () => {
dataCounter.set(refZeroObj);
assert.deepStrictEqual(dataCounter.results(), refZeroObj);
});
it('should skip dataCounter methods if initial values are not set', () => {
const testCounter = new DataCounter();
testCounter.addObject(singleSite(100), null, NEW_OBJ);
assert.deepStrictEqual(testCounter.results(), refZeroObj);
});
});
describe('DateCounter::updateTransientList', () => {
afterEach(() => dataCounter.updateTransientList({}));
it('should set transient list', () => {
assert.deepStrictEqual(dataCounter.transientList, {});
dataCounter.updateTransientList(locationConstraints);
const expectedRes = { locationOne: true, locationTwo: false };
assert.deepStrictEqual(dataCounter.transientList, expectedRes);
});
});
describe('DataCounter::addObject', () => {
const tests = [
{
it: 'should correctly update DataCounter, new object one site',
init: refZeroObj,
input: [singleSite(100), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 100, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, new object multi site',
init: refZeroObj,
input: [multiSite(100, true), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, overwrite single site',
init: refSingleObj,
input: [singleSite(100), singleSite(50), NEW_OBJ],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 250, prev: 0 },
byLocation: {
locationOne: { curr: 250, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, overwrite multi site',
init: refMultiObj,
input: [multiSite(100, true), multiSite(50, true), NEW_OBJ],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 500, prev: 0 },
byLocation: {
locationOne: { curr: 250, prev: 0 },
locationTwo: { curr: 250, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, new version single site',
init: refSingleObj,
input: [singleSite(100), singleSite(50), NEW_VER],
expectedRes: {
objects: 2, versions: 1,
dataManaged: {
total: { curr: 250, prev: 50 },
byLocation: {
locationOne: { curr: 250, prev: 50 },
},
},
},
},
{
it: 'should correctly update DataCounter, new version multi site',
init: refMultiObj,
input: [multiSite(100, true), multiSite(50, true), NEW_VER],
expectedRes: {
objects: 2, versions: 1,
dataManaged: {
total: { curr: 500, prev: 100 },
byLocation: {
locationOne: { curr: 250, prev: 50 },
locationTwo: { curr: 250, prev: 50 },
},
},
},
},
{
it: 'should correctly ignore pending status, multi site',
init: refZeroObj,
input: [multiSite(100, false), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 100, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'replication completion update in master object',
init: refSingleObj,
input: [multiSite(100, true), multiSite(100, false), UPDATE_MST],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 300, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'replication completion update in versioned object',
init: refSingleObjVer,
input: [multiSite(100, true), multiSite(100, false), UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
locationTwo: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'restoring versioned object as master',
init: refMultiObjVer,
input: [multiSite(100, true), multiSite(100, true), RESTORE],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 400, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
},
},
];
tests.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.addObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});
describe('DataCounter, update with transient location', () => {
before(() => dataCounter.updateTransientList(locationConstraints));
after(() => dataCounter.updateTransientList({}));
const pCurrMD = transientSite(100, 'PENDING', [
{ site: 'site1', status: 'PENDING' },
{ site: 'site2', status: 'COMPLETED' },
]);
const cCurrMD = transientSite(100, 'COMPLETED', [
{ site: 'site1', status: 'COMPLETED' },
{ site: 'site2', status: 'COMPLETED' },
]);
const prevMD = transientSite(100, 'PENDING', [
{ site: 'site1', status: 'PENDING' },
{ site: 'site2', status: 'PENDING' },
]);
const transientTest = [
{
it: 'should correctly update DataCounter, ' +
'version object, replication status = PENDING',
init: refSingleObjVer,
input: [pCurrMD, prevMD, UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
site2: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'version object, replication status = COMPLETED',
init: refSingleObjVer,
input: [cCurrMD, prevMD, UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
site1: { curr: 0, prev: 100 },
site2: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'master object, replication status = PENDING',
init: refSingleObjVer,
input: [pCurrMD, prevMD, UPDATE_MST],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
site2: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'master object, replication status = COMPLETED',
init: refSingleObjVer,
input: [cCurrMD, prevMD, UPDATE_MST],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 0, prev: 100 },
site1: { curr: 100, prev: 0 },
site2: { curr: 100, prev: 0 },
},
},
},
},
];
transientTest.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.addObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});
describe('DataCounter::delObject', () => {
const tests = [
{
it: 'should correctly update DataCounter, ' +
'delete master object single site',
init: refMultiObj,
input: [singleSite(100), DEL_MST],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 300, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete master object multi site',
init: refMultiObj,
input: [multiSite(100, true), DEL_MST],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete versioned object single site',
init: refMultiObjVer,
input: [singleSite(100), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete versioned object multi site',
init: refMultiObjVer,
input: [multiSite(100, true), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should clamp negative values to 0, master object',
init: refMultiObjVer,
input: [multiSite(300, true), DEL_MST],
expectedRes: {
objects: 0, versions: 1,
dataManaged: {
total: { curr: 0, prev: 200 },
byLocation: {
locationOne: { curr: 0, prev: 100 },
locationTwo: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should clamp negative values to 0, versioned object',
init: refMultiObjVer,
input: [multiSite(300, true), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
];
tests.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.delObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});

View File

@ -2,47 +2,11 @@ const assert = require('assert');
const MongoClientInterface = require(
'../../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
const DummyMongoDB = require('./utils/DummyMongoDB');
const DummyConfigObject = require('./utils/DummyConfigObject');
const DummyRequestLogger = require('./utils/DummyRequestLogger');
const log = new DummyRequestLogger();
const mongoTestClient = new MongoClientInterface({});
mongoTestClient.db = new DummyMongoDB();
describe('MongoClientInterface, init behavior', () => {
let s3ConfigObj;
const locationConstraints = {
locationOne: { isTransient: true },
locationTwo: { isTransient: false },
};
beforeEach(() => {
s3ConfigObj = new DummyConfigObject();
});
it('should set DataCounter transientList when declaring a ' +
'new MongoClientInterface object', () => {
s3ConfigObj.setLocationConstraints(locationConstraints);
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
const expectedRes = { locationOne: true, locationTwo: false };
assert.deepStrictEqual(
mongoClient.dataCount.transientList, expectedRes);
});
it('should update DataCounter transientList if location constraints ' +
'are updated', done => {
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
assert.deepStrictEqual(mongoClient.dataCount.transientList, {});
const expectedRes = { locationOne: true, locationTwo: false };
s3ConfigObj.once('MongoClientTestDone', () => {
assert.deepStrictEqual(
mongoClient.dataCount.transientList, expectedRes);
return done();
});
s3ConfigObj.setLocationConstraints(locationConstraints);
});
});
describe('MongoClientInterface::_handleResults', () => {
it('should return zero-result', () => {
@ -103,149 +67,9 @@ describe('MongoClientInterface::_handleResults', () => {
});
});
describe('MongoClientInterface::_handleMongo', () => {
beforeEach(() => mongoTestClient.db.reset());
it('should return error if mongo aggregate fails', done => {
const retValues = [new Error('testError')];
mongoTestClient.db.setReturnValues(retValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log, err => {
assert(err, 'Expected error, but got success');
return done();
});
});
it('should return empty object if mongo aggregate has no results', done => {
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {});
return done();
});
});
it('should return empty object if mongo aggregate has missing results',
done => {
const retValues = [[{
count: undefined,
data: undefined,
repData: undefined,
}]];
mongoTestClient.db.setReturnValues(retValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {});
return done();
});
});
const testRetValues = [[{
count: [{ _id: null, count: 100 }],
data: [
{ _id: 'locationone', bytes: 1000 },
{ _id: 'locationtwo', bytes: 1000 },
],
repData: [
{ _id: 'awsbackend', bytes: 500 },
{ _id: 'azurebackend', bytes: 500 },
{ _id: 'gcpbackend', bytes: 500 },
],
compData: [
{ _id: 'locationone', bytes: 500 },
{ _id: 'locationtwo', bytes: 500 },
],
}]];
it('should return correct results, transient false', done => {
mongoTestClient.db.setReturnValues(testRetValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {
count: 100,
data: {
locationone: 1000,
locationtwo: 1000,
awsbackend: 500,
azurebackend: 500,
gcpbackend: 500,
},
});
return done();
});
});
it('should return correct results, transient true', done => {
mongoTestClient.db.setReturnValues(testRetValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, true, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {
count: 100,
data: {
locationone: 500,
locationtwo: 500,
awsbackend: 500,
azurebackend: 500,
gcpbackend: 500,
},
});
return done();
});
});
const testRetValuesNeg = [[{
count: [{ _id: null, count: 100 }],
data: [
{ _id: 'locationone', bytes: 100 },
{ _id: 'locationtwo', bytes: 100 },
],
repData: [
{ _id: 'awsbackend', bytes: 500 },
{ _id: 'azurebackend', bytes: 500 },
{ _id: 'gcpbackend', bytes: 500 },
],
compData: [
{ _id: 'locationone', bytes: 500 },
{ _id: 'locationtwo', bytes: 500 },
],
}]];
it('should return clamp negative values to 0', done => {
mongoTestClient.db.setReturnValues(testRetValuesNeg);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, true, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {
count: 100,
data: {
locationone: 0,
locationtwo: 0,
awsbackend: 500,
azurebackend: 500,
gcpbackend: 500,
},
});
return done();
});
});
});
describe('MongoClientInterface, misc', () => {
let s3ConfigObj;
beforeEach(() => {
s3ConfigObj = new DummyConfigObject();
});
it('should filter out collections with special names', () => {
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
const mongoClient = new MongoClientInterface({});
assert.equal(mongoClient._isSpecialCollection('__foo'), true);
assert.equal(mongoClient._isSpecialCollection('bar'), false);
});

View File

@ -1,103 +0,0 @@
const testError = new Error('test error');
class DummyCollection {
constructor(name, isFail) {
this.s = {
name,
};
this.fail = isFail;
this.retQueue = [];
}
setReturnValues(retArray) {
this.retQueue.push(...retArray);
}
aggregate() {
return {
toArray: cb => {
if (this.retQueue.length <= 0) {
return cb(null, []);
}
const retVal = this.retQueue[0];
this.retQueue = this.retQueue.slice(1);
if (retVal instanceof Error) {
return cb(retVal);
}
return cb(null, retVal);
},
};
}
bulkWrite(cmds, opt, cb) {
process.stdout.write('mock mongodb.bulkWrite call\n');
if (this.fail) {
return cb(testError);
}
return cb();
}
update(target, doc, opt, cb) {
process.stdout.write('mock mongodb.update call\n');
if (this.fail) {
return cb(testError);
}
return cb();
}
find() {
return {
toArray: cb => {
if (this.retQueue.length <= 0) {
return cb(null, []);
}
const retVal = this.retQueue[0];
this.retQueue = this.retQueue.slice(1);
if (retVal instanceof Error) {
return cb(retVal);
}
return cb(null, retVal);
},
};
}
findOne(query, opt, cb) {
if (typeof opt === 'function' && cb === undefined) {
// eslint-disable-next-line no-param-reassign
cb = opt;
}
if (this.retQueue.length <= 0) {
return cb(null);
}
const retVal = this.retQueue[0];
this.retQueue = this.retQueue.slice(1);
if (retVal instanceof Error) {
return cb(retVal);
}
return cb(null, retVal);
}
}
class DummyMongoDB {
contructor() {
this.fail = false;
this.returnQueue = [];
}
reset() {
this.fail = false;
this.returnQueue = [];
}
setReturnValues(retValues) {
this.returnQueue.push(...retValues);
}
collection(name) {
const c = new DummyCollection(name, this.fail);
c.setReturnValues(this.returnQueue);
return c;
}
}
module.exports = DummyMongoDB;