Compare commits
1 Commits
developmen
...
wip/rework
Author | SHA1 | Date |
---|---|---|
Alexander Chan | 8588d21512 |
|
@ -1,274 +0,0 @@
|
|||
const NEW_OBJ = 0;
|
||||
const NEW_VER = 1;
|
||||
const UPDATE_VER = 2;
|
||||
const UPDATE_MST = 3;
|
||||
const RESTORE = 4;
|
||||
|
||||
const DEL_VER = 0;
|
||||
const DEL_MST = 1;
|
||||
|
||||
const CURR = 'curr';
|
||||
const PREV = 'prev';
|
||||
|
||||
function deepCopyObject(obj) {
|
||||
return JSON.parse(JSON.stringify(obj));
|
||||
}
|
||||
|
||||
class DataCounter {
|
||||
/**
|
||||
* DataCounter - class for keeping track of the ItemCount metrics
|
||||
* @return {DataCounter} DataCounter object
|
||||
*/
|
||||
constructor() {
|
||||
this.objects = 0;
|
||||
this.versions = 0;
|
||||
this.dataManaged = {
|
||||
total: { curr: 0, prev: 0 },
|
||||
byLocation: {},
|
||||
};
|
||||
this.stalled = 0;
|
||||
this.populated = false;
|
||||
this.transientList = {};
|
||||
}
|
||||
|
||||
/**
|
||||
* updateTransientList - update data counter list of transient locations
|
||||
* @param {Object} newLocations - list of locations constraint details
|
||||
* @return {undefined}
|
||||
*/
|
||||
updateTransientList(newLocations) {
|
||||
if (newLocations && Object.keys(newLocations).length > 0) {
|
||||
const tempList = {};
|
||||
Object.keys(newLocations).forEach(loc => {
|
||||
tempList[loc] = newLocations[loc].isTransient;
|
||||
});
|
||||
this.transientList = tempList;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* set - set DataCounter values
|
||||
* @param {Object} setVal - object containing values to be used for setting
|
||||
* DataCounter
|
||||
* @param {number} setVal.objects - number of master objects
|
||||
* @param {number} setVal.versions - number of versioned objects
|
||||
* @param {Object} setVal.dataManaged - object containing information about
|
||||
* all the data managed
|
||||
* @param {Object} setVal.total - object containing the total byte count of
|
||||
* data managed
|
||||
* @param {number} setVal.total.curr - the total byte count of master
|
||||
* objects
|
||||
* @param {number} setVal.total.prev - the total byte count of versioned
|
||||
* objects
|
||||
* @param {Object} setVal.byLocaton - object containing the information
|
||||
* about data managed on each location
|
||||
* @return {undefined}
|
||||
*/
|
||||
set(setVal) {
|
||||
if (setVal) {
|
||||
this.objects = setVal.objects;
|
||||
this.versions = setVal.versions;
|
||||
this.dataManaged = deepCopyObject(setVal.dataManaged);
|
||||
this.populated = true;
|
||||
this.stalled = setVal.stalled;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* results - creates a deep copy of the current DataCounter values
|
||||
* @return {Object} - object containing the current DataCounter values
|
||||
*/
|
||||
results() {
|
||||
const obj = {
|
||||
objects: this.objects,
|
||||
versions: this.versions,
|
||||
dataManaged: this.dataManaged,
|
||||
stalled: this.stalled,
|
||||
};
|
||||
return deepCopyObject(obj);
|
||||
}
|
||||
|
||||
/**
|
||||
* addObjectFn - performing add operations
|
||||
* @param {ObjectMD} currMD - new master version metadata
|
||||
* @param {ObjectMD} prevMD - old master version metadata
|
||||
* @param {number} type - index of the current type of add operation
|
||||
* @return {undefined}
|
||||
*/
|
||||
addObject(currMD, prevMD, type) {
|
||||
if (type !== undefined && type !== null && this.populated) {
|
||||
switch (type) {
|
||||
case NEW_OBJ: // add new object, replace master if needed
|
||||
if (prevMD) {
|
||||
this._delValue(prevMD, CURR);
|
||||
this._addValue(currMD, CURR);
|
||||
} else {
|
||||
++this.objects;
|
||||
this._addValue(currMD, CURR);
|
||||
}
|
||||
break;
|
||||
case NEW_VER: // add new object, archive master
|
||||
++this.versions;
|
||||
this._delValue(prevMD, CURR);
|
||||
this._addValue(prevMD, PREV);
|
||||
this._addValue(currMD, CURR);
|
||||
break;
|
||||
case UPDATE_VER: // update archived object, replication info
|
||||
this._updateObject(currMD, prevMD, PREV);
|
||||
break;
|
||||
case UPDATE_MST: // update master object, replication info
|
||||
this._updateObject(currMD, prevMD, CURR);
|
||||
break;
|
||||
case RESTORE:
|
||||
--this.versions;
|
||||
this._delValue(currMD, PREV);
|
||||
++this.objects;
|
||||
this._addValue(currMD, CURR);
|
||||
break;
|
||||
default:
|
||||
// should throw error, noop
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* delObjectFn - performing del operations
|
||||
* @param {ObjectMD} currMD - object metadata
|
||||
* @param {number} type - index of the current type of delete operation
|
||||
* @return {undefined}
|
||||
*/
|
||||
delObject(currMD, type) {
|
||||
if (type !== undefined && type !== null && this.populated) {
|
||||
switch (type) {
|
||||
case DEL_VER:
|
||||
--this.versions;
|
||||
this._delValue(currMD, PREV);
|
||||
break;
|
||||
case DEL_MST:
|
||||
--this.objects;
|
||||
this._delValue(currMD, CURR);
|
||||
break;
|
||||
default:
|
||||
// should throw error, noop
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_addLocation(site, size, type) {
|
||||
this.dataManaged.total[type] += size;
|
||||
if (!this.dataManaged.byLocation[site]) {
|
||||
this.dataManaged.byLocation[site] = {
|
||||
curr: 0,
|
||||
prev: 0,
|
||||
};
|
||||
}
|
||||
this.dataManaged.byLocation[site][type] += size;
|
||||
}
|
||||
|
||||
/**
|
||||
* _addValue - helper function for handling put object updates
|
||||
* @param {ObjectMD} objMD - object metadata
|
||||
* @param {string} type - string with value either 'curr' or 'prev'
|
||||
* @return {undefined}
|
||||
*/
|
||||
_addValue(objMD, type) {
|
||||
if (objMD) {
|
||||
const { replicationInfo, 'content-length': size } = objMD;
|
||||
const { backends } = replicationInfo || {};
|
||||
this._addLocation(objMD.dataStoreName, size, type);
|
||||
if (backends && Array.isArray(backends)) {
|
||||
backends.forEach(loc => {
|
||||
const { site, status } = loc;
|
||||
if (status === 'COMPLETED') {
|
||||
this._addLocation(site, size, type);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* _updateObject - helper function for handling updates from replication
|
||||
* info changes
|
||||
* @param {ObjectMD} currMD - new object metadata
|
||||
* @param {ObjectMD} prevMD - old object metadata
|
||||
* @param {string} type - string with value either 'curr' or 'prev'
|
||||
* @return {undefined}
|
||||
*/
|
||||
_updateObject(currMD, prevMD, type) {
|
||||
const transientList = Object.assign({}, this.transientList);
|
||||
if (currMD && prevMD) {
|
||||
// check for changes in replication
|
||||
const { replicationInfo: currLocs,
|
||||
'content-length': size, dataStoreName } = currMD;
|
||||
const { replicationInfo: prevLocs } = prevMD;
|
||||
const { backends: prevBackends } = prevLocs || {};
|
||||
const { backends: currBackends } = currLocs || {};
|
||||
const oldLocs = {};
|
||||
if (prevBackends && Array.isArray(prevBackends)) {
|
||||
prevBackends.forEach(loc => {
|
||||
const { site, status } = loc;
|
||||
oldLocs[site] = status;
|
||||
});
|
||||
}
|
||||
if (currBackends && Array.isArray(currBackends)) {
|
||||
currBackends.forEach(loc => {
|
||||
const { site, status } = loc;
|
||||
if (site in oldLocs && status === 'COMPLETED' &&
|
||||
oldLocs[site] !== status) {
|
||||
this._addLocation(site, size, type);
|
||||
}
|
||||
});
|
||||
}
|
||||
if (currLocs.status === 'COMPLETED' &&
|
||||
transientList[dataStoreName]) {
|
||||
this._delLocation(dataStoreName, size, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_delLocation(site, size, type) {
|
||||
if (this.dataManaged.byLocation[site]) {
|
||||
this.dataManaged.total[type] -= size;
|
||||
this.dataManaged.total[type] =
|
||||
Math.max(0, this.dataManaged.total[type]);
|
||||
this.dataManaged.byLocation[site][type] -= size;
|
||||
this.dataManaged.byLocation[site][type] =
|
||||
Math.max(0, this.dataManaged.byLocation[site][type]);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* _delValue - helper function for handling delete object operations
|
||||
* @param {ObjectMD} objMD - object metadata
|
||||
* @param {string} type - string with value either 'curr' or 'prev'
|
||||
* @return {undefined}
|
||||
*/
|
||||
_delValue(objMD, type) {
|
||||
if (objMD) {
|
||||
const { replicationInfo, 'content-length': size } = objMD;
|
||||
const { backends } = replicationInfo || {};
|
||||
this._delLocation(objMD.dataStoreName, size, type);
|
||||
if (backends && Array.isArray(backends)) {
|
||||
backends.forEach(loc => {
|
||||
const { site, status } = loc;
|
||||
if (status === 'COMPLETED') {
|
||||
this._delLocation(site, size, type);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
NEW_OBJ,
|
||||
NEW_VER,
|
||||
UPDATE_VER,
|
||||
UPDATE_MST,
|
||||
RESTORE,
|
||||
DEL_VER,
|
||||
DEL_MST,
|
||||
DataCounter,
|
||||
};
|
|
@ -27,7 +27,6 @@ const listAlgos = require('../../../algos/list/exportAlgos');
|
|||
|
||||
const MongoReadStream = require('./readStream');
|
||||
const MongoUtils = require('./utils');
|
||||
const { DataCounter } = require('./DataCounter');
|
||||
const Skip = require('../../../algos/list/skip');
|
||||
|
||||
const USERSBUCKET = '__usersbucket';
|
||||
|
@ -40,7 +39,7 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
|
|||
const CONNECT_TIMEOUT_MS = 5000;
|
||||
// MongoDB default
|
||||
const SOCKET_TIMEOUT_MS = 360000;
|
||||
|
||||
const NUM_PARALLEL_COUNT_PROC = 1;
|
||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||
|
||||
let uidCounter = 0;
|
||||
|
@ -92,7 +91,7 @@ function generatePHDVersion(versionId) {
|
|||
class MongoClientInterface {
|
||||
constructor(params) {
|
||||
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
||||
database, logger, replicationGroupId, authCredentials, config,
|
||||
database, logger, replicationGroupId, authCredentials,
|
||||
isLocationTransient } = params;
|
||||
const cred = MongoUtils.credPrefix(authCredentials);
|
||||
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
||||
|
@ -104,20 +103,7 @@ class MongoClientInterface {
|
|||
this.path = path;
|
||||
this.replicationGroupId = replicationGroupId;
|
||||
this.database = database;
|
||||
this.dataCount = new DataCounter();
|
||||
this.isLocationTransient = isLocationTransient;
|
||||
|
||||
if (config && config instanceof EventEmitter) {
|
||||
this.config = config;
|
||||
this.config.on('location-constraints-update', () => {
|
||||
this.dataCount
|
||||
.updateTransientList(this.config.locationConstraints);
|
||||
if (this.config.isTest) {
|
||||
this.config.emit('MongoClientTestDone');
|
||||
}
|
||||
});
|
||||
this.dataCount.updateTransientList(this.config.locationConstraints);
|
||||
}
|
||||
}
|
||||
|
||||
setup(cb) {
|
||||
|
@ -1283,43 +1269,51 @@ class MongoClientInterface {
|
|||
store.buckets = bucketCount;
|
||||
store.bucketList = retBucketInfos;
|
||||
|
||||
return async.eachLimit(bucketInfos, 10, (bucketInfo, done) => {
|
||||
async.waterfall([
|
||||
next => this._getIsTransient(bucketInfo, log, next),
|
||||
(isTransient, next) => {
|
||||
const bucketName = bucketInfo.getName();
|
||||
this.getObjectMDStats(bucketName, bucketInfo,
|
||||
isTransient, log, next);
|
||||
},
|
||||
], (err, results) => {
|
||||
let concurrentCounts = NUM_PARALLEL_COUNT_PROC;
|
||||
if (process.env.NUM_PARALLEL_COUNT_PROC &&
|
||||
!Number.isNaN(process.env.NUM_PARALLEL_COUNT_PROC)) {
|
||||
concurrentCounts = Number
|
||||
.parseInt(process.env.NUM_PARALLEL_COUNT_PROC, 1);
|
||||
}
|
||||
|
||||
return async.eachLimit(bucketInfos, concurrentCounts,
|
||||
(bucketInfo, done) => {
|
||||
async.waterfall([
|
||||
next => this._getIsTransient(bucketInfo, log, next),
|
||||
(isTransient, next) => {
|
||||
const bucketName = bucketInfo.getName();
|
||||
this.getObjectMDStats(bucketName, bucketInfo,
|
||||
isTransient, log, next);
|
||||
},
|
||||
], (err, results) => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
if (results.dataManaged) {
|
||||
store.objects += results.objects;
|
||||
store.versions += results.versions;
|
||||
store.stalled += results.stalled;
|
||||
consolidateData(results.dataManaged);
|
||||
}
|
||||
return done();
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
if (results.dataManaged) {
|
||||
store.objects += results.objects;
|
||||
store.versions += results.versions;
|
||||
store.stalled += results.stalled;
|
||||
consolidateData(results.dataManaged);
|
||||
}
|
||||
return done();
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
// save to infostore
|
||||
return this.updateCountItems(store, log, err => {
|
||||
if (err) {
|
||||
log.error('error saving count items in mongo', {
|
||||
method: 'scanItemCount',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, store);
|
||||
// save to infostore
|
||||
return this.updateCountItems(store, log, err => {
|
||||
if (err) {
|
||||
log.error('error saving count items in mongo', {
|
||||
method: 'scanItemCount',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, store);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
|
@ -1429,37 +1423,28 @@ class MongoClientInterface {
|
|||
return results;
|
||||
}
|
||||
|
||||
_handleMongo(c, filter, isTransient, log, cb) {
|
||||
const reducedFields = {
|
||||
'_id': 1,
|
||||
'value.versionId': 1,
|
||||
'value.replicationInfo.status': 1,
|
||||
'value.replicationInfo.backends': 1,
|
||||
'value.content-length': 1,
|
||||
'value.dataStoreName': 1,
|
||||
};
|
||||
_handleData(data, repData) {
|
||||
const retResult = this._handleEntries(data);
|
||||
const repDataEntries = this._handleEntries(repData);
|
||||
Object.keys(repDataEntries).forEach(site => {
|
||||
if (!retResult[site]) {
|
||||
retResult[site] = 0;
|
||||
}
|
||||
retResult[site] += repDataEntries[site];
|
||||
});
|
||||
return retResult;
|
||||
}
|
||||
|
||||
const aggCount = [
|
||||
{ $project: { _id: 1 } },
|
||||
{ $group: { _id: null, count: { $sum: 1 } } },
|
||||
_createCountPipeline(filter) {
|
||||
return [
|
||||
{ $match: filter },
|
||||
{ $count: 'count' },
|
||||
];
|
||||
}
|
||||
|
||||
const aggData = [
|
||||
{ $project: {
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
} },
|
||||
{ $group: {
|
||||
_id: '$value.dataStoreName',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
} },
|
||||
];
|
||||
|
||||
const aggRepData = [
|
||||
{ $project: {
|
||||
'value.replicationInfo.backends': 1,
|
||||
'value.content-length': 1,
|
||||
} },
|
||||
_createRepDataPipeline(filter) {
|
||||
return [
|
||||
{ $match: filter },
|
||||
{ $unwind: '$value.replicationInfo.backends' },
|
||||
{ $match: {
|
||||
'value.replicationInfo.backends.status': { $eq: 'COMPLETED' },
|
||||
|
@ -1469,155 +1454,119 @@ class MongoClientInterface {
|
|||
bytes: { $sum: '$value.content-length' },
|
||||
} },
|
||||
];
|
||||
}
|
||||
|
||||
const aggCompleted = [
|
||||
{ $project: {
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
'inComplete': {
|
||||
$eq: ['$value.replicationInfo.status', 'COMPLETED'],
|
||||
},
|
||||
} },
|
||||
{ $match: { inComplete: true } },
|
||||
_createDataPipeline(isTransient, filter) {
|
||||
const matchStage = !isTransient
|
||||
? filter
|
||||
: Object.assign(
|
||||
{},
|
||||
filter,
|
||||
{ 'value.replicationInfo.status': { $ne: 'COMPLETED' } }
|
||||
);
|
||||
return [
|
||||
{ $match: matchStage },
|
||||
{ $group: {
|
||||
_id: '$value.dataStoreName',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
} },
|
||||
];
|
||||
|
||||
return c.aggregate([
|
||||
{ $project: reducedFields },
|
||||
{ $match: filter },
|
||||
{ $facet: {
|
||||
count: aggCount,
|
||||
data: aggData,
|
||||
repData: aggRepData,
|
||||
compData: aggCompleted,
|
||||
} },
|
||||
]).toArray((err, res) => {
|
||||
if (err) {
|
||||
log.error('Error when processing mongo entries', {
|
||||
method: '_handleMongo',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
if (!res || res.length < 1) {
|
||||
log.debug('aggregate returned empty results', {
|
||||
method: '_handleMongo',
|
||||
});
|
||||
return cb(null, {});
|
||||
}
|
||||
const agg = res[0];
|
||||
if (!agg || !agg.count || !agg.data || !agg.repData) {
|
||||
log.debug('missing field in aggregate results', {
|
||||
method: '_handleMongo',
|
||||
});
|
||||
return cb(null, {});
|
||||
}
|
||||
const retResult = {
|
||||
count: this._handleCount(agg.count[0] || {}),
|
||||
data: this._handleEntries(agg.data),
|
||||
};
|
||||
const repDataEntries = this._handleEntries(agg.repData);
|
||||
Object.keys(repDataEntries).forEach(site => {
|
||||
if (!retResult.data[site]) {
|
||||
retResult.data[site] = 0;
|
||||
}
|
||||
retResult.data[site] += repDataEntries[site];
|
||||
});
|
||||
if (isTransient && agg.compData) {
|
||||
const compDataEntries = this._handleEntries(agg.compData);
|
||||
Object.keys(compDataEntries).forEach(site => {
|
||||
if (retResult.data[site]) {
|
||||
retResult.data[site] -= compDataEntries[site];
|
||||
retResult.data[site] =
|
||||
Math.max(0, retResult.data[site]);
|
||||
}
|
||||
});
|
||||
}
|
||||
return cb(null, retResult);
|
||||
});
|
||||
}
|
||||
|
||||
_getStalled(c, cmpDate, log, cb) {
|
||||
const reducedFields = {
|
||||
'_id': {
|
||||
id: '$_id',
|
||||
status: '$value.replicationInfo.status',
|
||||
},
|
||||
'value.last-modified': 1,
|
||||
_aggregateCount(c, params, callback) {
|
||||
const { cmpDate, isTransient } = params;
|
||||
const mstFilter = {
|
||||
'isVersioned': { $eq: -1 },
|
||||
'value.versionId': { $exists: true },
|
||||
};
|
||||
const verFilter = { isVersioned: { $ne: -1 } };
|
||||
const nullFilter = {
|
||||
'isVersioned': { $eq: -1 },
|
||||
'value.versionId': { $exists: false },
|
||||
};
|
||||
return c.aggregate([
|
||||
{ $project: reducedFields },
|
||||
{ $match: {
|
||||
'_id.id': { $regex: /\0/ },
|
||||
'_id.status': { $eq: 'PENDING' },
|
||||
} },
|
||||
]).toArray((err, res) => {
|
||||
{
|
||||
$project: {
|
||||
'_id': 1,
|
||||
'value.replicationInfo': 1,
|
||||
'value.last-modified': {
|
||||
$dateFromString: { dateString: '$value.last-modified' },
|
||||
},
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
'value.versionId': 1,
|
||||
'isVersioned': { $indexOfCP: ['$_id', '\0'] },
|
||||
},
|
||||
},
|
||||
{
|
||||
$facet: {
|
||||
stalled: [
|
||||
{ $match: Object.assign(
|
||||
{},
|
||||
verFilter,
|
||||
{
|
||||
'value.replicationInfo.status': { $in: [
|
||||
'PENDING',
|
||||
] },
|
||||
'value.last-modified': {
|
||||
$lt: cmpDate,
|
||||
},
|
||||
}
|
||||
) },
|
||||
{ $count: 'count' },
|
||||
],
|
||||
nullCount: this._createCountPipeline(nullFilter),
|
||||
nullRepData: this._createRepDataPipeline(nullFilter),
|
||||
nullData: this._createDataPipeline(isTransient, nullFilter),
|
||||
mstCount: this._createCountPipeline(mstFilter),
|
||||
mstRepData: this._createRepDataPipeline(mstFilter),
|
||||
mstData: this._createDataPipeline(isTransient, mstFilter),
|
||||
verCount: this._createCountPipeline(verFilter),
|
||||
verRepData: this._createRepDataPipeline(verFilter),
|
||||
verData: this._createDataPipeline(isTransient, verFilter),
|
||||
},
|
||||
},
|
||||
], { allowDiskUse: true }).toArray((err, res) => {
|
||||
if (err) {
|
||||
log.debug('Unable to retrieve stalled entries', {
|
||||
error: err,
|
||||
});
|
||||
return cb(null);
|
||||
return callback(err);
|
||||
}
|
||||
const count = res.filter(data => {
|
||||
if (!data || typeof data !== 'object' ||
|
||||
!data.value || typeof data.value !== 'object') {
|
||||
return false;
|
||||
}
|
||||
const time = data.value['last-modified'] || null;
|
||||
if (isNaN(Date.parse(time))) {
|
||||
return false;
|
||||
}
|
||||
const testDate = new Date(time);
|
||||
return testDate <= cmpDate;
|
||||
}).length;
|
||||
return cb(null, count);
|
||||
const resObj = {
|
||||
masterCount: this._handleCount(res[0].mstCount[0]),
|
||||
masterData: this._handleData(res[0].mstData,
|
||||
res[0].mstRepData),
|
||||
nullCount: this._handleCount(res[0].nullCount[0]),
|
||||
nullData: this._handleData(res[0].nullData,
|
||||
res[0].nullRepData),
|
||||
versionCount: this._handleCount(res[0].verCount[0]),
|
||||
versionData: this._handleData(res[0].verData,
|
||||
res[0].verRepData),
|
||||
stalled: this._handleCount(res[0].stalled[0]),
|
||||
};
|
||||
return callback(null, resObj);
|
||||
});
|
||||
}
|
||||
|
||||
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
|
||||
const c = this.getCollection(bucketName);
|
||||
const mstFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: true },
|
||||
};
|
||||
const verFilter = { _id: { $regex: /\0/ } };
|
||||
const nullFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: false },
|
||||
};
|
||||
|
||||
const cmpDate = new Date();
|
||||
cmpDate.setHours(cmpDate.getHours() - 1);
|
||||
|
||||
async.parallel({
|
||||
version: done =>
|
||||
this._handleMongo(c, verFilter, isTransient, log, done),
|
||||
null: done =>
|
||||
this._handleMongo(c, nullFilter, isTransient, log, done),
|
||||
master: done =>
|
||||
this._handleMongo(c, mstFilter, isTransient, log, done),
|
||||
stalled: done =>
|
||||
this._getStalled(c, cmpDate, log, done),
|
||||
}, (err, res) => {
|
||||
const params = {
|
||||
cmpDate,
|
||||
isTransient,
|
||||
};
|
||||
return this._aggregateCount(c, params, (err, res) => {
|
||||
if (err) {
|
||||
log.error('Error when perform count items aggregate', {
|
||||
method: 'getObjectMDStats',
|
||||
error: err,
|
||||
});
|
||||
return callback(err);
|
||||
}
|
||||
const resObj = {
|
||||
masterCount: res.master.count || 0,
|
||||
masterData: res.master.data || {},
|
||||
nullCount: res.null.count || 0,
|
||||
nullData: res.null.data || {},
|
||||
versionCount: res.version.count || 0,
|
||||
versionData: res.version.data || {},
|
||||
};
|
||||
const bucketStatus = bucketInfo.getVersioningConfiguration();
|
||||
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
|
||||
bucketStatus.Status === 'Suspended'));
|
||||
const retResult = this._handleResults(resObj, isVer);
|
||||
retResult.stalled = res.stalled || 0;
|
||||
const retResult = this._handleResults(res, isVer);
|
||||
retResult.stalled = res.stalled;
|
||||
return callback(null, retResult);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,507 +0,0 @@
|
|||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
NEW_OBJ,
|
||||
NEW_VER,
|
||||
UPDATE_VER,
|
||||
UPDATE_MST,
|
||||
RESTORE,
|
||||
DEL_VER,
|
||||
DEL_MST,
|
||||
DataCounter,
|
||||
} = require('../../../../../lib/storage/metadata/mongoclient/DataCounter');
|
||||
|
||||
const refZeroObj = {
|
||||
objects: 0,
|
||||
versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 0, prev: 0 },
|
||||
byLocation: {},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refSingleObj = {
|
||||
objects: 2,
|
||||
versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refSingleObjVer = {
|
||||
objects: 1,
|
||||
versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refMultiObjVer = {
|
||||
objects: 1,
|
||||
versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
locationTwo: { curr: 100, prev: 100 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refMultiObj = {
|
||||
objects: 2,
|
||||
versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 400, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
locationTwo: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const singleSite = size => ({
|
||||
'content-length': size,
|
||||
'dataStoreName': 'locationOne',
|
||||
'replicationInfo': {
|
||||
backends: [],
|
||||
},
|
||||
});
|
||||
|
||||
const multiSite = (size, isComplete) => ({
|
||||
'content-length': size,
|
||||
'dataStoreName': 'locationOne',
|
||||
'replicationInfo': {
|
||||
backends: [{
|
||||
site: 'locationTwo',
|
||||
status: isComplete ? 'COMPLETED' : 'PENDING',
|
||||
}],
|
||||
},
|
||||
});
|
||||
|
||||
const transientSite = (size, status, backends) => ({
|
||||
'content-length': size,
|
||||
'dataStoreName': 'locationOne',
|
||||
'replicationInfo': { status, backends },
|
||||
});
|
||||
|
||||
const locationConstraints = {
|
||||
locationOne: { isTransient: true },
|
||||
locationTwo: { isTransient: false },
|
||||
};
|
||||
|
||||
const dataCounter = new DataCounter();
|
||||
|
||||
describe('DataCounter Class', () => {
|
||||
it('should create a zero object', () => {
|
||||
dataCounter.set(refZeroObj);
|
||||
assert.deepStrictEqual(dataCounter.results(), refZeroObj);
|
||||
});
|
||||
|
||||
it('should skip dataCounter methods if initial values are not set', () => {
|
||||
const testCounter = new DataCounter();
|
||||
testCounter.addObject(singleSite(100), null, NEW_OBJ);
|
||||
assert.deepStrictEqual(testCounter.results(), refZeroObj);
|
||||
});
|
||||
});
|
||||
|
||||
describe('DateCounter::updateTransientList', () => {
|
||||
afterEach(() => dataCounter.updateTransientList({}));
|
||||
it('should set transient list', () => {
|
||||
assert.deepStrictEqual(dataCounter.transientList, {});
|
||||
dataCounter.updateTransientList(locationConstraints);
|
||||
const expectedRes = { locationOne: true, locationTwo: false };
|
||||
assert.deepStrictEqual(dataCounter.transientList, expectedRes);
|
||||
});
|
||||
});
|
||||
|
||||
describe('DataCounter::addObject', () => {
|
||||
const tests = [
|
||||
{
|
||||
it: 'should correctly update DataCounter, new object one site',
|
||||
init: refZeroObj,
|
||||
input: [singleSite(100), null, NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, new object multi site',
|
||||
init: refZeroObj,
|
||||
input: [multiSite(100, true), null, NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, overwrite single site',
|
||||
init: refSingleObj,
|
||||
input: [singleSite(100), singleSite(50), NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 250, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, overwrite multi site',
|
||||
init: refMultiObj,
|
||||
input: [multiSite(100, true), multiSite(50, true), NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 500, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 0 },
|
||||
locationTwo: { curr: 250, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, new version single site',
|
||||
init: refSingleObj,
|
||||
input: [singleSite(100), singleSite(50), NEW_VER],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 250, prev: 50 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 50 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, new version multi site',
|
||||
init: refMultiObj,
|
||||
input: [multiSite(100, true), multiSite(50, true), NEW_VER],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 500, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 50 },
|
||||
locationTwo: { curr: 250, prev: 50 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly ignore pending status, multi site',
|
||||
init: refZeroObj,
|
||||
input: [multiSite(100, false), null, NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'replication completion update in master object',
|
||||
init: refSingleObj,
|
||||
input: [multiSite(100, true), multiSite(100, false), UPDATE_MST],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 300, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'replication completion update in versioned object',
|
||||
init: refSingleObjVer,
|
||||
input: [multiSite(100, true), multiSite(100, false), UPDATE_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
locationTwo: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'restoring versioned object as master',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(100, true), multiSite(100, true), RESTORE],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 400, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
locationTwo: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
tests.forEach(test => it(test.it, () => {
|
||||
const { expectedRes, input, init } = test;
|
||||
dataCounter.set(init);
|
||||
dataCounter.addObject(...input);
|
||||
const testResults = dataCounter.results();
|
||||
Object.keys(expectedRes).forEach(key => {
|
||||
if (typeof expectedRes[key] === 'object') {
|
||||
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||
} else {
|
||||
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
||||
|
||||
describe('DataCounter, update with transient location', () => {
|
||||
before(() => dataCounter.updateTransientList(locationConstraints));
|
||||
after(() => dataCounter.updateTransientList({}));
|
||||
|
||||
const pCurrMD = transientSite(100, 'PENDING', [
|
||||
{ site: 'site1', status: 'PENDING' },
|
||||
{ site: 'site2', status: 'COMPLETED' },
|
||||
]);
|
||||
const cCurrMD = transientSite(100, 'COMPLETED', [
|
||||
{ site: 'site1', status: 'COMPLETED' },
|
||||
{ site: 'site2', status: 'COMPLETED' },
|
||||
]);
|
||||
const prevMD = transientSite(100, 'PENDING', [
|
||||
{ site: 'site1', status: 'PENDING' },
|
||||
{ site: 'site2', status: 'PENDING' },
|
||||
]);
|
||||
const transientTest = [
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'version object, replication status = PENDING',
|
||||
init: refSingleObjVer,
|
||||
input: [pCurrMD, prevMD, UPDATE_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
site2: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'version object, replication status = COMPLETED',
|
||||
init: refSingleObjVer,
|
||||
input: [cCurrMD, prevMD, UPDATE_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
site1: { curr: 0, prev: 100 },
|
||||
site2: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'master object, replication status = PENDING',
|
||||
init: refSingleObjVer,
|
||||
input: [pCurrMD, prevMD, UPDATE_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
site2: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'master object, replication status = COMPLETED',
|
||||
init: refSingleObjVer,
|
||||
input: [cCurrMD, prevMD, UPDATE_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 0, prev: 100 },
|
||||
site1: { curr: 100, prev: 0 },
|
||||
site2: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
transientTest.forEach(test => it(test.it, () => {
|
||||
const { expectedRes, input, init } = test;
|
||||
dataCounter.set(init);
|
||||
dataCounter.addObject(...input);
|
||||
const testResults = dataCounter.results();
|
||||
Object.keys(expectedRes).forEach(key => {
|
||||
if (typeof expectedRes[key] === 'object') {
|
||||
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||
} else {
|
||||
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
||||
|
||||
describe('DataCounter::delObject', () => {
|
||||
const tests = [
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete master object single site',
|
||||
init: refMultiObj,
|
||||
input: [singleSite(100), DEL_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 300, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete master object multi site',
|
||||
init: refMultiObj,
|
||||
input: [multiSite(100, true), DEL_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete versioned object single site',
|
||||
init: refMultiObjVer,
|
||||
input: [singleSite(100), DEL_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete versioned object multi site',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(100, true), DEL_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should clamp negative values to 0, master object',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(300, true), DEL_MST],
|
||||
expectedRes: {
|
||||
objects: 0, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 0, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 0, prev: 100 },
|
||||
locationTwo: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should clamp negative values to 0, versioned object',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(300, true), DEL_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
tests.forEach(test => it(test.it, () => {
|
||||
const { expectedRes, input, init } = test;
|
||||
dataCounter.set(init);
|
||||
dataCounter.delObject(...input);
|
||||
const testResults = dataCounter.results();
|
||||
Object.keys(expectedRes).forEach(key => {
|
||||
if (typeof expectedRes[key] === 'object') {
|
||||
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||
} else {
|
||||
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
|
@ -2,47 +2,11 @@ const assert = require('assert');
|
|||
|
||||
const MongoClientInterface = require(
|
||||
'../../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
|
||||
const DummyMongoDB = require('./utils/DummyMongoDB');
|
||||
const DummyConfigObject = require('./utils/DummyConfigObject');
|
||||
const DummyRequestLogger = require('./utils/DummyRequestLogger');
|
||||
|
||||
const log = new DummyRequestLogger();
|
||||
const mongoTestClient = new MongoClientInterface({});
|
||||
mongoTestClient.db = new DummyMongoDB();
|
||||
|
||||
describe('MongoClientInterface, init behavior', () => {
|
||||
let s3ConfigObj;
|
||||
const locationConstraints = {
|
||||
locationOne: { isTransient: true },
|
||||
locationTwo: { isTransient: false },
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
s3ConfigObj = new DummyConfigObject();
|
||||
});
|
||||
|
||||
it('should set DataCounter transientList when declaring a ' +
|
||||
'new MongoClientInterface object', () => {
|
||||
s3ConfigObj.setLocationConstraints(locationConstraints);
|
||||
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||
const expectedRes = { locationOne: true, locationTwo: false };
|
||||
assert.deepStrictEqual(
|
||||
mongoClient.dataCount.transientList, expectedRes);
|
||||
});
|
||||
|
||||
it('should update DataCounter transientList if location constraints ' +
|
||||
'are updated', done => {
|
||||
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||
assert.deepStrictEqual(mongoClient.dataCount.transientList, {});
|
||||
const expectedRes = { locationOne: true, locationTwo: false };
|
||||
s3ConfigObj.once('MongoClientTestDone', () => {
|
||||
assert.deepStrictEqual(
|
||||
mongoClient.dataCount.transientList, expectedRes);
|
||||
return done();
|
||||
});
|
||||
s3ConfigObj.setLocationConstraints(locationConstraints);
|
||||
});
|
||||
});
|
||||
|
||||
describe('MongoClientInterface::_handleResults', () => {
|
||||
it('should return zero-result', () => {
|
||||
|
@ -103,149 +67,9 @@ describe('MongoClientInterface::_handleResults', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('MongoClientInterface::_handleMongo', () => {
|
||||
beforeEach(() => mongoTestClient.db.reset());
|
||||
|
||||
it('should return error if mongo aggregate fails', done => {
|
||||
const retValues = [new Error('testError')];
|
||||
mongoTestClient.db.setReturnValues(retValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log, err => {
|
||||
assert(err, 'Expected error, but got success');
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should return empty object if mongo aggregate has no results', done => {
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should return empty object if mongo aggregate has missing results',
|
||||
done => {
|
||||
const retValues = [[{
|
||||
count: undefined,
|
||||
data: undefined,
|
||||
repData: undefined,
|
||||
}]];
|
||||
mongoTestClient.db.setReturnValues(retValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
const testRetValues = [[{
|
||||
count: [{ _id: null, count: 100 }],
|
||||
data: [
|
||||
{ _id: 'locationone', bytes: 1000 },
|
||||
{ _id: 'locationtwo', bytes: 1000 },
|
||||
],
|
||||
repData: [
|
||||
{ _id: 'awsbackend', bytes: 500 },
|
||||
{ _id: 'azurebackend', bytes: 500 },
|
||||
{ _id: 'gcpbackend', bytes: 500 },
|
||||
],
|
||||
compData: [
|
||||
{ _id: 'locationone', bytes: 500 },
|
||||
{ _id: 'locationtwo', bytes: 500 },
|
||||
],
|
||||
}]];
|
||||
|
||||
it('should return correct results, transient false', done => {
|
||||
mongoTestClient.db.setReturnValues(testRetValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {
|
||||
count: 100,
|
||||
data: {
|
||||
locationone: 1000,
|
||||
locationtwo: 1000,
|
||||
awsbackend: 500,
|
||||
azurebackend: 500,
|
||||
gcpbackend: 500,
|
||||
},
|
||||
});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should return correct results, transient true', done => {
|
||||
mongoTestClient.db.setReturnValues(testRetValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, true, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {
|
||||
count: 100,
|
||||
data: {
|
||||
locationone: 500,
|
||||
locationtwo: 500,
|
||||
awsbackend: 500,
|
||||
azurebackend: 500,
|
||||
gcpbackend: 500,
|
||||
},
|
||||
});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
const testRetValuesNeg = [[{
|
||||
count: [{ _id: null, count: 100 }],
|
||||
data: [
|
||||
{ _id: 'locationone', bytes: 100 },
|
||||
{ _id: 'locationtwo', bytes: 100 },
|
||||
],
|
||||
repData: [
|
||||
{ _id: 'awsbackend', bytes: 500 },
|
||||
{ _id: 'azurebackend', bytes: 500 },
|
||||
{ _id: 'gcpbackend', bytes: 500 },
|
||||
],
|
||||
compData: [
|
||||
{ _id: 'locationone', bytes: 500 },
|
||||
{ _id: 'locationtwo', bytes: 500 },
|
||||
],
|
||||
}]];
|
||||
it('should return clamp negative values to 0', done => {
|
||||
mongoTestClient.db.setReturnValues(testRetValuesNeg);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, true, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {
|
||||
count: 100,
|
||||
data: {
|
||||
locationone: 0,
|
||||
locationtwo: 0,
|
||||
awsbackend: 500,
|
||||
azurebackend: 500,
|
||||
gcpbackend: 500,
|
||||
},
|
||||
});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('MongoClientInterface, misc', () => {
|
||||
let s3ConfigObj;
|
||||
|
||||
beforeEach(() => {
|
||||
s3ConfigObj = new DummyConfigObject();
|
||||
});
|
||||
|
||||
it('should filter out collections with special names', () => {
|
||||
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||
const mongoClient = new MongoClientInterface({});
|
||||
assert.equal(mongoClient._isSpecialCollection('__foo'), true);
|
||||
assert.equal(mongoClient._isSpecialCollection('bar'), false);
|
||||
});
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
const testError = new Error('test error');
|
||||
|
||||
class DummyCollection {
|
||||
constructor(name, isFail) {
|
||||
this.s = {
|
||||
name,
|
||||
};
|
||||
this.fail = isFail;
|
||||
this.retQueue = [];
|
||||
}
|
||||
|
||||
setReturnValues(retArray) {
|
||||
this.retQueue.push(...retArray);
|
||||
}
|
||||
|
||||
aggregate() {
|
||||
return {
|
||||
toArray: cb => {
|
||||
if (this.retQueue.length <= 0) {
|
||||
return cb(null, []);
|
||||
}
|
||||
const retVal = this.retQueue[0];
|
||||
this.retQueue = this.retQueue.slice(1);
|
||||
if (retVal instanceof Error) {
|
||||
return cb(retVal);
|
||||
}
|
||||
return cb(null, retVal);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
bulkWrite(cmds, opt, cb) {
|
||||
process.stdout.write('mock mongodb.bulkWrite call\n');
|
||||
if (this.fail) {
|
||||
return cb(testError);
|
||||
}
|
||||
return cb();
|
||||
}
|
||||
|
||||
update(target, doc, opt, cb) {
|
||||
process.stdout.write('mock mongodb.update call\n');
|
||||
if (this.fail) {
|
||||
return cb(testError);
|
||||
}
|
||||
return cb();
|
||||
}
|
||||
|
||||
find() {
|
||||
return {
|
||||
toArray: cb => {
|
||||
if (this.retQueue.length <= 0) {
|
||||
return cb(null, []);
|
||||
}
|
||||
const retVal = this.retQueue[0];
|
||||
this.retQueue = this.retQueue.slice(1);
|
||||
if (retVal instanceof Error) {
|
||||
return cb(retVal);
|
||||
}
|
||||
return cb(null, retVal);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
findOne(query, opt, cb) {
|
||||
if (typeof opt === 'function' && cb === undefined) {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
cb = opt;
|
||||
}
|
||||
if (this.retQueue.length <= 0) {
|
||||
return cb(null);
|
||||
}
|
||||
const retVal = this.retQueue[0];
|
||||
this.retQueue = this.retQueue.slice(1);
|
||||
if (retVal instanceof Error) {
|
||||
return cb(retVal);
|
||||
}
|
||||
return cb(null, retVal);
|
||||
}
|
||||
}
|
||||
|
||||
class DummyMongoDB {
|
||||
contructor() {
|
||||
this.fail = false;
|
||||
this.returnQueue = [];
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.fail = false;
|
||||
this.returnQueue = [];
|
||||
}
|
||||
|
||||
setReturnValues(retValues) {
|
||||
this.returnQueue.push(...retValues);
|
||||
}
|
||||
|
||||
collection(name) {
|
||||
const c = new DummyCollection(name, this.fail);
|
||||
c.setReturnValues(this.returnQueue);
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = DummyMongoDB;
|
Loading…
Reference in New Issue