Compare commits
No commits in common. "2f8af6352cbf0e79a81ec0b69312c5e8ae2e502d" and "ad58f6698162b11ddf44452e4719bf11f26ade2e" have entirely different histories.
2f8af6352c
...
ad58f66981
|
@ -11,6 +11,7 @@
|
||||||
*/
|
*/
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
|
||||||
|
const { EventEmitter } = require('events');
|
||||||
const constants = require('../../../constants');
|
const constants = require('../../../constants');
|
||||||
|
|
||||||
const { reshapeExceptionError } = require('../../../errorUtils');
|
const { reshapeExceptionError } = require('../../../errorUtils');
|
||||||
|
@ -26,6 +27,7 @@ const listAlgos = require('../../../algos/list/exportAlgos');
|
||||||
|
|
||||||
const MongoReadStream = require('./readStream');
|
const MongoReadStream = require('./readStream');
|
||||||
const MongoUtils = require('./utils');
|
const MongoUtils = require('./utils');
|
||||||
|
const { DataCounter } = require('./DataCounter');
|
||||||
const Skip = require('../../../algos/list/skip');
|
const Skip = require('../../../algos/list/skip');
|
||||||
|
|
||||||
const USERSBUCKET = '__usersbucket';
|
const USERSBUCKET = '__usersbucket';
|
||||||
|
@ -38,7 +40,6 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
|
||||||
const CONNECT_TIMEOUT_MS = 5000;
|
const CONNECT_TIMEOUT_MS = 5000;
|
||||||
// MongoDB default
|
// MongoDB default
|
||||||
const SOCKET_TIMEOUT_MS = 360000;
|
const SOCKET_TIMEOUT_MS = 360000;
|
||||||
const CONCURRENT_CURSORS = 10;
|
|
||||||
|
|
||||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||||
|
|
||||||
|
@ -91,7 +92,7 @@ function generatePHDVersion(versionId) {
|
||||||
class MongoClientInterface {
|
class MongoClientInterface {
|
||||||
constructor(params) {
|
constructor(params) {
|
||||||
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
||||||
database, logger, replicationGroupId, authCredentials,
|
database, logger, replicationGroupId, authCredentials, config,
|
||||||
isLocationTransient } = params;
|
isLocationTransient } = params;
|
||||||
const cred = MongoUtils.credPrefix(authCredentials);
|
const cred = MongoUtils.credPrefix(authCredentials);
|
||||||
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
||||||
|
@ -103,7 +104,20 @@ class MongoClientInterface {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.replicationGroupId = replicationGroupId;
|
this.replicationGroupId = replicationGroupId;
|
||||||
this.database = database;
|
this.database = database;
|
||||||
|
this.dataCount = new DataCounter();
|
||||||
this.isLocationTransient = isLocationTransient;
|
this.isLocationTransient = isLocationTransient;
|
||||||
|
|
||||||
|
if (config && config instanceof EventEmitter) {
|
||||||
|
this.config = config;
|
||||||
|
this.config.on('location-constraints-update', () => {
|
||||||
|
this.dataCount
|
||||||
|
.updateTransientList(this.config.locationConstraints);
|
||||||
|
if (this.config.isTest) {
|
||||||
|
this.config.emit('MongoClientTestDone');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
this.dataCount.updateTransientList(this.config.locationConstraints);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
setup(cb) {
|
setup(cb) {
|
||||||
|
@ -122,10 +136,6 @@ class MongoClientInterface {
|
||||||
const socketTimeoutMS = Number.parseInt(
|
const socketTimeoutMS = Number.parseInt(
|
||||||
process.env.MONGO_SOCKET_TIMEOUT_MS, 10) || SOCKET_TIMEOUT_MS;
|
process.env.MONGO_SOCKET_TIMEOUT_MS, 10) || SOCKET_TIMEOUT_MS;
|
||||||
const options = { connectTimeoutMS, socketTimeoutMS };
|
const options = { connectTimeoutMS, socketTimeoutMS };
|
||||||
if (process.env.MONGO_POOL_SIZE &&
|
|
||||||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
|
||||||
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
|
||||||
}
|
|
||||||
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
this.logger.error('error connecting to mongodb',
|
this.logger.error('error connecting to mongodb',
|
||||||
|
@ -1139,7 +1149,7 @@ class MongoClientInterface {
|
||||||
* get bucket related information for count items, used by cloudserver
|
* get bucket related information for count items, used by cloudserver
|
||||||
* and s3utils
|
* and s3utils
|
||||||
*/
|
*/
|
||||||
getBucketInfos(log, cb) {
|
_getBucketInfos(log, cb) {
|
||||||
let bucketCount = 0;
|
let bucketCount = 0;
|
||||||
const bucketInfos = [];
|
const bucketInfos = [];
|
||||||
|
|
||||||
|
@ -1188,7 +1198,7 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
countItems(log, cb) {
|
countItems(log, cb) {
|
||||||
this.getBucketInfos(log, (err, res) => {
|
this._getBucketInfos(log, (err, res) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('error getting bucket info', {
|
log.error('error getting bucket info', {
|
||||||
method: 'countItems',
|
method: 'countItems',
|
||||||
|
@ -1220,7 +1230,20 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
consolidateData(store, dataManaged) {
|
scanItemCount(log, cb) {
|
||||||
|
const store = {
|
||||||
|
objects: 0,
|
||||||
|
versions: 0,
|
||||||
|
buckets: 0,
|
||||||
|
bucketList: [],
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 0, prev: 0 },
|
||||||
|
byLocation: {},
|
||||||
|
},
|
||||||
|
stalled: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const consolidateData = dataManaged => {
|
||||||
if (dataManaged && dataManaged.locations && dataManaged.total) {
|
if (dataManaged && dataManaged.locations && dataManaged.total) {
|
||||||
const locations = dataManaged.locations;
|
const locations = dataManaged.locations;
|
||||||
store.dataManaged.total.curr += dataManaged.total.curr;
|
store.dataManaged.total.curr += dataManaged.total.curr;
|
||||||
|
@ -1237,25 +1260,9 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
scanItemCount(log, cb) {
|
|
||||||
const store = {
|
|
||||||
objects: 0,
|
|
||||||
versions: 0,
|
|
||||||
buckets: 0,
|
|
||||||
bucketList: [],
|
|
||||||
dataManaged: {
|
|
||||||
total: { curr: 0, prev: 0 },
|
|
||||||
byLocation: {},
|
|
||||||
},
|
|
||||||
stalled: 0,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const consolidateData = (dataManaged) =>
|
this._getBucketInfos(log, (err, res) => {
|
||||||
this.consolidateData(store, dataManaged);
|
|
||||||
|
|
||||||
this.getBucketInfos(log, (err, res) => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('error getting bucket info', {
|
log.error('error getting bucket info', {
|
||||||
method: 'scanItemCount',
|
method: 'scanItemCount',
|
||||||
|
@ -1276,15 +1283,7 @@ class MongoClientInterface {
|
||||||
store.buckets = bucketCount;
|
store.buckets = bucketCount;
|
||||||
store.bucketList = retBucketInfos;
|
store.bucketList = retBucketInfos;
|
||||||
|
|
||||||
let concurrentCursors = CONCURRENT_CURSORS;
|
return async.eachLimit(bucketInfos, 10, (bucketInfo, done) => {
|
||||||
if (process.env.CONCURRENT_CURSORS &&
|
|
||||||
!Number.isNaN(process.env.CONCURRENT_CURSORS)) {
|
|
||||||
concurrentCursors = Number
|
|
||||||
.parseInt(process.env.CONCURRENT_CURSORS, 10);
|
|
||||||
}
|
|
||||||
|
|
||||||
return async.eachLimit(bucketInfos, concurrentCursors,
|
|
||||||
(bucketInfo, done) => {
|
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
next => this._getIsTransient(bucketInfo, log, next),
|
next => this._getIsTransient(bucketInfo, log, next),
|
||||||
(isTransient, next) => {
|
(isTransient, next) => {
|
||||||
|
@ -1331,6 +1330,7 @@ class MongoClientInterface {
|
||||||
this.isLocationTransient(locConstraint, log, cb);
|
this.isLocationTransient(locConstraint, log, cb);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this._pensieveLocationIsTransient(locConstraint, log, cb);
|
this._pensieveLocationIsTransient(locConstraint, log, cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1415,136 +1415,211 @@ class MongoClientInterface {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
_handleCount(entry) {
|
||||||
* @param{object} entry -
|
return entry && entry.count > 0 ? entry.count : 0;
|
||||||
* @param{string} entry._id -
|
}
|
||||||
* @param{object} entry.value -
|
|
||||||
* @param{boolean} isTransient -
|
_handleEntries(entries) {
|
||||||
* @returns{object.<string, number>} results -
|
|
||||||
*/
|
|
||||||
_processEntryData(entry, isTransient) {
|
|
||||||
const results = {};
|
const results = {};
|
||||||
if (!isTransient ||
|
if (entries) {
|
||||||
entry.value.replicationInfo.status !== 'COMPLETED') {
|
entries.forEach(entry => {
|
||||||
if (results[entry.value.dataStoreName]) {
|
results[entry._id] = entry.bytes;
|
||||||
results[entry.value.dataStoreName] +=
|
|
||||||
entry.value['content-length'];
|
|
||||||
} else {
|
|
||||||
results[entry.value.dataStoreName] =
|
|
||||||
entry.value['content-length'];
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (!results[entry.value.dataStoreName]) {
|
|
||||||
results[entry.value.dataStoreName] = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
entry.value.replicationInfo.backends.forEach(rep => {
|
|
||||||
if (rep.status === 'COMPLETED') {
|
|
||||||
if (results[rep.site]) {
|
|
||||||
results[rep.site] += entry.value['content-length'];
|
|
||||||
} else {
|
|
||||||
results[rep.site] = entry.value['content-length'];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
}
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
_handleMongo(c, filter, isTransient, log, cb) {
|
||||||
* @param{object} entry -
|
const reducedFields = {
|
||||||
* @param{string} entry._id -
|
|
||||||
* @param{object} entry.value -
|
|
||||||
* @param{Date} cmpDate -
|
|
||||||
* @returns{boolean} stalled -
|
|
||||||
*/
|
|
||||||
_processEntryStalled(entry, cmpDate) {
|
|
||||||
if (entry.value.replicationInfo.status === 'PENDING') {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const lastModified = Date.parse(entry.value['last-modified'] || null);
|
|
||||||
if (isNaN(lastModified) || new Date(lastModified) > cmpDate) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* scan and process a single collection (bucket)
|
|
||||||
*/
|
|
||||||
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
|
|
||||||
const c = this.getCollection(bucketName);
|
|
||||||
const cursor = c.find({}, {
|
|
||||||
sort: { _id: 1 },
|
|
||||||
projection: {
|
|
||||||
'_id': 1,
|
'_id': 1,
|
||||||
'value.last-modified': 1,
|
'value.versionId': 1,
|
||||||
'value.replicationInfo': 1,
|
'value.replicationInfo.status': 1,
|
||||||
|
'value.replicationInfo.backends': 1,
|
||||||
|
'value.content-length': 1,
|
||||||
|
'value.dataStoreName': 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
const aggCount = [
|
||||||
|
{ $project: { _id: 1 } },
|
||||||
|
{ $group: { _id: null, count: { $sum: 1 } } },
|
||||||
|
];
|
||||||
|
|
||||||
|
const aggData = [
|
||||||
|
{ $project: {
|
||||||
'value.dataStoreName': 1,
|
'value.dataStoreName': 1,
|
||||||
'value.content-length': 1,
|
'value.content-length': 1,
|
||||||
'value.versionId': 1,
|
} },
|
||||||
|
{ $group: {
|
||||||
|
_id: '$value.dataStoreName',
|
||||||
|
bytes: { $sum: '$value.content-length' },
|
||||||
|
} },
|
||||||
|
];
|
||||||
|
|
||||||
|
const aggRepData = [
|
||||||
|
{ $project: {
|
||||||
|
'value.replicationInfo.backends': 1,
|
||||||
|
'value.content-length': 1,
|
||||||
|
} },
|
||||||
|
{ $unwind: '$value.replicationInfo.backends' },
|
||||||
|
{ $match: {
|
||||||
|
'value.replicationInfo.backends.status': { $eq: 'COMPLETED' },
|
||||||
|
} },
|
||||||
|
{ $group: {
|
||||||
|
_id: '$value.replicationInfo.backends.site',
|
||||||
|
bytes: { $sum: '$value.content-length' },
|
||||||
|
} },
|
||||||
|
];
|
||||||
|
|
||||||
|
const aggCompleted = [
|
||||||
|
{ $project: {
|
||||||
|
'value.dataStoreName': 1,
|
||||||
|
'value.content-length': 1,
|
||||||
|
'inComplete': {
|
||||||
|
$eq: ['$value.replicationInfo.status', 'COMPLETED'],
|
||||||
},
|
},
|
||||||
|
} },
|
||||||
|
{ $match: { inComplete: true } },
|
||||||
|
{ $group: {
|
||||||
|
_id: '$value.dataStoreName',
|
||||||
|
bytes: { $sum: '$value.content-length' },
|
||||||
|
} },
|
||||||
|
];
|
||||||
|
|
||||||
|
return c.aggregate([
|
||||||
|
{ $project: reducedFields },
|
||||||
|
{ $match: filter },
|
||||||
|
{ $facet: {
|
||||||
|
count: aggCount,
|
||||||
|
data: aggData,
|
||||||
|
repData: aggRepData,
|
||||||
|
compData: aggCompleted,
|
||||||
|
} },
|
||||||
|
]).toArray((err, res) => {
|
||||||
|
if (err) {
|
||||||
|
log.error('Error when processing mongo entries', {
|
||||||
|
method: '_handleMongo',
|
||||||
|
error: err,
|
||||||
});
|
});
|
||||||
const collRes = {
|
return cb(err);
|
||||||
masterCount: 0,
|
}
|
||||||
masterData: {},
|
if (!res || res.length < 1) {
|
||||||
nullCount: 0,
|
log.debug('aggregate returned empty results', {
|
||||||
nullData: {},
|
method: '_handleMongo',
|
||||||
versionCount: 0,
|
});
|
||||||
versionData: {},
|
return cb(null, {});
|
||||||
|
}
|
||||||
|
const agg = res[0];
|
||||||
|
if (!agg || !agg.count || !agg.data || !agg.repData) {
|
||||||
|
log.debug('missing field in aggregate results', {
|
||||||
|
method: '_handleMongo',
|
||||||
|
});
|
||||||
|
return cb(null, {});
|
||||||
|
}
|
||||||
|
const retResult = {
|
||||||
|
count: this._handleCount(agg.count[0] || {}),
|
||||||
|
data: this._handleEntries(agg.data),
|
||||||
};
|
};
|
||||||
let stalledCount = 0;
|
const repDataEntries = this._handleEntries(agg.repData);
|
||||||
|
Object.keys(repDataEntries).forEach(site => {
|
||||||
|
if (!retResult.data[site]) {
|
||||||
|
retResult.data[site] = 0;
|
||||||
|
}
|
||||||
|
retResult.data[site] += repDataEntries[site];
|
||||||
|
});
|
||||||
|
if (isTransient && agg.compData) {
|
||||||
|
const compDataEntries = this._handleEntries(agg.compData);
|
||||||
|
Object.keys(compDataEntries).forEach(site => {
|
||||||
|
if (retResult.data[site]) {
|
||||||
|
retResult.data[site] -= compDataEntries[site];
|
||||||
|
retResult.data[site] =
|
||||||
|
Math.max(0, retResult.data[site]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return cb(null, retResult);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_getStalled(c, cmpDate, log, cb) {
|
||||||
|
const reducedFields = {
|
||||||
|
'_id': {
|
||||||
|
id: '$_id',
|
||||||
|
status: '$value.replicationInfo.status',
|
||||||
|
},
|
||||||
|
'value.last-modified': 1,
|
||||||
|
};
|
||||||
|
return c.aggregate([
|
||||||
|
{ $project: reducedFields },
|
||||||
|
{ $match: {
|
||||||
|
'_id.id': { $regex: /\0/ },
|
||||||
|
'_id.status': { $eq: 'PENDING' },
|
||||||
|
} },
|
||||||
|
]).toArray((err, res) => {
|
||||||
|
if (err) {
|
||||||
|
log.debug('Unable to retrieve stalled entries', {
|
||||||
|
error: err,
|
||||||
|
});
|
||||||
|
return cb(null);
|
||||||
|
}
|
||||||
|
const count = res.filter(data => {
|
||||||
|
if (!data || typeof data !== 'object' ||
|
||||||
|
!data.value || typeof data.value !== 'object') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const time = data.value['last-modified'] || null;
|
||||||
|
if (isNaN(Date.parse(time))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const testDate = new Date(time);
|
||||||
|
return testDate <= cmpDate;
|
||||||
|
}).length;
|
||||||
|
return cb(null, count);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
|
||||||
|
const c = this.getCollection(bucketName);
|
||||||
|
const mstFilter = {
|
||||||
|
'_id': { $regex: /^[^\0]+$/ },
|
||||||
|
'value.versionId': { $exists: true },
|
||||||
|
};
|
||||||
|
const verFilter = { _id: { $regex: /\0/ } };
|
||||||
|
const nullFilter = {
|
||||||
|
'_id': { $regex: /^[^\0]+$/ },
|
||||||
|
'value.versionId': { $exists: false },
|
||||||
|
};
|
||||||
|
|
||||||
const cmpDate = new Date();
|
const cmpDate = new Date();
|
||||||
cmpDate.setHours(cmpDate.getHours() - 1);
|
cmpDate.setHours(cmpDate.getHours() - 1);
|
||||||
|
|
||||||
cursor.forEach(
|
async.parallel({
|
||||||
res => {
|
version: done =>
|
||||||
const data = this._processEntryData(res, isTransient);
|
this._handleMongo(c, verFilter, isTransient, log, done),
|
||||||
let targetCount;
|
null: done =>
|
||||||
let targetData;
|
this._handleMongo(c, nullFilter, isTransient, log, done),
|
||||||
if (res._id.indexOf('\0') !== -1) {
|
master: done =>
|
||||||
// versioned item
|
this._handleMongo(c, mstFilter, isTransient, log, done),
|
||||||
targetCount = 'versionCount';
|
stalled: done =>
|
||||||
targetData = 'versionData';
|
this._getStalled(c, cmpDate, log, done),
|
||||||
|
}, (err, res) => {
|
||||||
if (res.value.replicationInfo.backends.length > 0 &&
|
|
||||||
this._processEntryStalled(res, cmpDate)) {
|
|
||||||
stalledCount++;
|
|
||||||
}
|
|
||||||
} else if (!!res.value.versionId) {
|
|
||||||
// master version
|
|
||||||
targetCount = 'masterCount';
|
|
||||||
targetData = 'masterData';
|
|
||||||
} else {
|
|
||||||
// null version
|
|
||||||
targetCount = 'nullCount';
|
|
||||||
targetData = 'nullData';
|
|
||||||
}
|
|
||||||
collRes[targetCount]++;
|
|
||||||
Object.keys(data).forEach(site => {
|
|
||||||
if (collRes[targetData][site]) {
|
|
||||||
collRes[targetData][site] += data[site];
|
|
||||||
} else {
|
|
||||||
collRes[targetData][site] = data[site];
|
|
||||||
}
|
|
||||||
});
|
|
||||||
},
|
|
||||||
err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('Error when processing mongo entries', {
|
|
||||||
method: 'getObjectMDStats',
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
|
const resObj = {
|
||||||
|
masterCount: res.master.count || 0,
|
||||||
|
masterData: res.master.data || {},
|
||||||
|
nullCount: res.null.count || 0,
|
||||||
|
nullData: res.null.data || {},
|
||||||
|
versionCount: res.version.count || 0,
|
||||||
|
versionData: res.version.data || {},
|
||||||
|
};
|
||||||
const bucketStatus = bucketInfo.getVersioningConfiguration();
|
const bucketStatus = bucketInfo.getVersioningConfiguration();
|
||||||
const isVer = (bucketStatus &&
|
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
|
||||||
(bucketStatus.Status === 'Enabled' ||
|
|
||||||
bucketStatus.Status === 'Suspended'));
|
bucketStatus.Status === 'Suspended'));
|
||||||
const retResult = this._handleResults(collRes, isVer);
|
const retResult = this._handleResults(resObj, isVer);
|
||||||
retResult.stalled = stalledCount;
|
retResult.stalled = res.stalled || 0;
|
||||||
return callback(null, retResult);
|
return callback(null, retResult);
|
||||||
}
|
});
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
getIngestionBuckets(log, cb) {
|
getIngestionBuckets(log, cb) {
|
||||||
|
@ -1673,4 +1748,3 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = MongoClientInterface;
|
module.exports = MongoClientInterface;
|
||||||
|
|
||||||
|
|
|
@ -1,14 +1,11 @@
|
||||||
const async = require('async');
|
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
|
const async = require('async');
|
||||||
const werelogs = require('werelogs');
|
const werelogs = require('werelogs');
|
||||||
const { MongoMemoryReplSet } = require('mongodb-memory-server');
|
const { MongoMemoryReplSet } = require('mongodb-memory-server');
|
||||||
|
|
||||||
const errors = require('../../../../lib/errors');
|
const errors = require('../../../../lib/errors');
|
||||||
const logger = new werelogs.Logger('MongoClientInterface', 'debug', 'debug');
|
const logger = new werelogs.Logger('MongoClientInterface', 'debug', 'debug');
|
||||||
const BucketInfo = require('../../../../lib/models/BucketInfo');
|
const BucketInfo = require('../../../../lib/models/BucketInfo');
|
||||||
const ObjectMD = require('../../../../lib/models/ObjectMD');
|
|
||||||
const MongoClientInterface =
|
|
||||||
require('../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
|
|
||||||
const MetadataWrapper =
|
const MetadataWrapper =
|
||||||
require('../../../../lib/storage/metadata/MetadataWrapper');
|
require('../../../../lib/storage/metadata/MetadataWrapper');
|
||||||
|
|
||||||
|
@ -317,331 +314,3 @@ describe('MongoClientInterface', () => {
|
||||||
}));
|
}));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
function isTransientFn(locationName, log, callback) {
|
|
||||||
if (locationName.startsWith('transient')) {
|
|
||||||
return callback(null, true);
|
|
||||||
}
|
|
||||||
return callback(null, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
function createBucketMD(locationName, bucketName, isVersioned, creationDate) {
|
|
||||||
const bucketObj = {
|
|
||||||
'_acl': {
|
|
||||||
'Canned': 'private',
|
|
||||||
'FULL_CONTROL': [],
|
|
||||||
'WRITE': [],
|
|
||||||
'WRITE_ACP': [],
|
|
||||||
'READ': [],
|
|
||||||
'READ_ACP': []
|
|
||||||
},
|
|
||||||
'_name': bucketName,
|
|
||||||
'_owner': 'testOwner',
|
|
||||||
'_ownerDisplayName':'testOwnerDisplayName',
|
|
||||||
'_creationDate': creationDate,
|
|
||||||
'_mdBucketModelVersion': 10,
|
|
||||||
'_transient': false,
|
|
||||||
'_deleted': false,
|
|
||||||
'_serverSideEncryption': null,
|
|
||||||
'_versioningConfiguration': isVersioned === null
|
|
||||||
? null
|
|
||||||
: { Status: isVersioned ? 'Enabled' : 'Suspended' },
|
|
||||||
'_locationConstraint': locationName,
|
|
||||||
'_readLocationConstraint': null,
|
|
||||||
'_cors': null,
|
|
||||||
'_replicationConfiguration': null,
|
|
||||||
'_lifecycleConfiguration': null,
|
|
||||||
'_uid': '',
|
|
||||||
'_isNFS': null,
|
|
||||||
'ingestion': null
|
|
||||||
};
|
|
||||||
return BucketInfo.fromObj(bucketObj);
|
|
||||||
}
|
|
||||||
|
|
||||||
function createObjectMD(params) {
|
|
||||||
const {
|
|
||||||
locationName,
|
|
||||||
objectKey,
|
|
||||||
replicationInfo,
|
|
||||||
lastModified,
|
|
||||||
size,
|
|
||||||
versionId,
|
|
||||||
} = params;
|
|
||||||
return new ObjectMD()
|
|
||||||
.setLastModified(lastModified)
|
|
||||||
.setContentLength(size)
|
|
||||||
.setDataStoreName(locationName)
|
|
||||||
.setReplicationInfo(replicationInfo)
|
|
||||||
.setVersionId(versionId)
|
|
||||||
.setKey(objectKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
function createReplicationInfo(backends, status) {
|
|
||||||
return {
|
|
||||||
status,
|
|
||||||
backends,
|
|
||||||
content: [],
|
|
||||||
destination: '',
|
|
||||||
storageClass: '',
|
|
||||||
role: '',
|
|
||||||
storageType: '',
|
|
||||||
dataStoreVersionId: '',
|
|
||||||
isNFS: null,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
describe.only('MongoClientInterface::scanItemCount', () => {
|
|
||||||
let mongoc;
|
|
||||||
before(done => {
|
|
||||||
mongoserver.waitUntilRunning().then(() => {
|
|
||||||
const opts = {
|
|
||||||
isLocationTransient: isTransientFn,
|
|
||||||
replicaSetHosts: 'localhost:27018',
|
|
||||||
writeConcern: 'majority',
|
|
||||||
replicaSet: 'rs0',
|
|
||||||
readPreference: 'primary',
|
|
||||||
database: dbName,
|
|
||||||
logger
|
|
||||||
};
|
|
||||||
mongoc = new MongoClientInterface(opts);
|
|
||||||
mongoc.setup(done);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
after(done => {
|
|
||||||
async.series([
|
|
||||||
next => mongoc.close(next),
|
|
||||||
next => mongoserver.stop()
|
|
||||||
.then(() => next())
|
|
||||||
.catch(next),
|
|
||||||
], done);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should return correct count with transient location (COMPLETED)', done => {
|
|
||||||
const bucketName = 'test-bucket';
|
|
||||||
const location = 'transient-loc-1';
|
|
||||||
const createDate = new Date().toJSON();
|
|
||||||
const objectName = 'test-object';
|
|
||||||
const versionId = '00001';
|
|
||||||
const params = {
|
|
||||||
locationName: location,
|
|
||||||
objectKey: objectName,
|
|
||||||
replicationInfo: createReplicationInfo([
|
|
||||||
{site: 'loc1', status: 'COMPLETED'},
|
|
||||||
{site: 'loc2', status: 'COMPLETED'},
|
|
||||||
], 'COMPLETED'),
|
|
||||||
size: 1,
|
|
||||||
versionId,
|
|
||||||
lastModified: new Date().toJSON(),
|
|
||||||
};
|
|
||||||
const expected = {
|
|
||||||
total: { curr: 2, prev: 0 },
|
|
||||||
byLocation: {
|
|
||||||
'transient-loc-1': { curr: 0, prev: 0 },
|
|
||||||
'loc1': { curr: 1, prev: 0 },
|
|
||||||
'loc2': { curr: 1, prev: 0 },
|
|
||||||
},
|
|
||||||
};
|
|
||||||
async.series([
|
|
||||||
next => mongoc.createBucket(
|
|
||||||
bucketName,
|
|
||||||
createBucketMD(location, bucketName, false, createDate),
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: false },
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => scanAndTest(mongoc, expected, next),
|
|
||||||
], done);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should return correct count with transient location (PENDING)', done => {
|
|
||||||
const bucketName = 'test-bucket';
|
|
||||||
const location = 'transient-loc-1';
|
|
||||||
const createDate = new Date().toJSON();
|
|
||||||
const objectName = 'test-object';
|
|
||||||
const versionId = '00001';
|
|
||||||
const params = {
|
|
||||||
locationName: location,
|
|
||||||
objectKey: objectName,
|
|
||||||
replicationInfo: createReplicationInfo([
|
|
||||||
{site: 'loc1', status: 'COMPLETED'},
|
|
||||||
{site: 'loc2', status: 'COMPLETED'},
|
|
||||||
], 'PENDING'),
|
|
||||||
size: 1,
|
|
||||||
versionId,
|
|
||||||
lastModified: new Date().toJSON(),
|
|
||||||
};
|
|
||||||
const expected = {
|
|
||||||
total: { curr: 3, prev: 0 },
|
|
||||||
byLocation: {
|
|
||||||
'transient-loc-1': { curr: 1, prev: 0 },
|
|
||||||
'loc1': { curr: 1, prev: 0 },
|
|
||||||
'loc2': { curr: 1, prev: 0 },
|
|
||||||
},
|
|
||||||
};
|
|
||||||
async.series([
|
|
||||||
next => mongoc.createBucket(
|
|
||||||
bucketName,
|
|
||||||
createBucketMD(location, bucketName, false, createDate),
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: false },
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => scanAndTest(mongoc, expected, next),
|
|
||||||
], done);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should correctly count null objects', done => {
|
|
||||||
const bucketName = 'test-bucket';
|
|
||||||
const location = 'test-loc-1';
|
|
||||||
const createDate = new Date().toJSON();
|
|
||||||
const objectName = 'test-object';
|
|
||||||
const versionId = '00001';
|
|
||||||
const params = {
|
|
||||||
locationName: location,
|
|
||||||
objectKey: objectName,
|
|
||||||
replicationInfo: createReplicationInfo([], ''),
|
|
||||||
size: 1,
|
|
||||||
versionId,
|
|
||||||
lastModified: new Date().toJSON(),
|
|
||||||
};
|
|
||||||
const expected = {
|
|
||||||
total: { curr: 1, prev: 0 },
|
|
||||||
byLocation: {
|
|
||||||
'test-loc-1': { curr: 1, prev: 0 },
|
|
||||||
},
|
|
||||||
};
|
|
||||||
async.series([
|
|
||||||
next => mongoc.createBucket(
|
|
||||||
bucketName,
|
|
||||||
createBucketMD(location, bucketName, false, createDate),
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: false },
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.createBucket(
|
|
||||||
bucketName,
|
|
||||||
createBucketMD(location, bucketName, true, createDate),
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => scanAndTest(mongoc, expected, next),
|
|
||||||
], done);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should correctly count versioned objects', done => {
|
|
||||||
const bucketName = 'test-bucket';
|
|
||||||
const location = 'test-loc-1';
|
|
||||||
const createDate = new Date().toJSON();
|
|
||||||
const objectName = 'test-object';
|
|
||||||
const versionId = '00001';
|
|
||||||
const params = {
|
|
||||||
locationName: location,
|
|
||||||
objectKey: objectName,
|
|
||||||
replicationInfo: createReplicationInfo([], ''),
|
|
||||||
size: 1,
|
|
||||||
versionId,
|
|
||||||
lastModified: new Date().toJSON(),
|
|
||||||
};
|
|
||||||
const expected = {
|
|
||||||
total: { curr: 1, prev: 1 },
|
|
||||||
byLocation: {
|
|
||||||
'test-loc-1': { curr: 1, prev: 1 },
|
|
||||||
},
|
|
||||||
};
|
|
||||||
async.series([
|
|
||||||
next => mongoc.createBucket(
|
|
||||||
bucketName,
|
|
||||||
createBucketMD(location, bucketName, true, createDate),
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: true, versionId: '00001'},
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: true, versionId: '00002' },
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => scanAndTest(mongoc, expected, next),
|
|
||||||
], done);
|
|
||||||
});
|
|
||||||
|
|
||||||
it('should correctly count stalled objects', done => {
|
|
||||||
const bucketName = 'test-bucket';
|
|
||||||
const location = 'test-loc-1';
|
|
||||||
const createDate = new Date().toJSON();
|
|
||||||
const objectName = 'test-object';
|
|
||||||
const versionId = '00001';
|
|
||||||
const params = {
|
|
||||||
locationName: location,
|
|
||||||
objectKey: objectName,
|
|
||||||
replicationInfo: createReplicationInfo([], ''),
|
|
||||||
size: 1,
|
|
||||||
versionId,
|
|
||||||
lastModified: new Date().toJSON(),
|
|
||||||
};
|
|
||||||
const expected = {
|
|
||||||
total: { curr: 1, prev: 1 },
|
|
||||||
byLocation: {
|
|
||||||
'test-loc-1': { curr: 1, prev: 1 },
|
|
||||||
},
|
|
||||||
};
|
|
||||||
async.series([
|
|
||||||
next => mongoc.createBucket(
|
|
||||||
bucketName,
|
|
||||||
createBucketMD(location, bucketName, true, createDate),
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: true, versionId: '00001'},
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => mongoc.putObject(
|
|
||||||
bucketName,
|
|
||||||
objectName,
|
|
||||||
createObjectMD(params).getValue(),
|
|
||||||
{ versioning: true, versionId: '00002' },
|
|
||||||
logger,
|
|
||||||
next),
|
|
||||||
next => scanAndTest(mongoc, expected, next),
|
|
||||||
], done);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
function scanAndTest(mongoc, expected, callback) {
|
|
||||||
async.series([
|
|
||||||
next => mongoc.scanItemCount(logger, next),
|
|
||||||
next => mongoc.countItems(logger, (err, res) => {
|
|
||||||
if (err) {
|
|
||||||
return next(err);
|
|
||||||
}
|
|
||||||
console.log(res)
|
|
||||||
assert.deepStrictEqual(res.dataManaged, expected);
|
|
||||||
return next();
|
|
||||||
}),
|
|
||||||
], callback);
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,507 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
const {
|
||||||
|
NEW_OBJ,
|
||||||
|
NEW_VER,
|
||||||
|
UPDATE_VER,
|
||||||
|
UPDATE_MST,
|
||||||
|
RESTORE,
|
||||||
|
DEL_VER,
|
||||||
|
DEL_MST,
|
||||||
|
DataCounter,
|
||||||
|
} = require('../../../../../lib/storage/metadata/mongoclient/DataCounter');
|
||||||
|
|
||||||
|
const refZeroObj = {
|
||||||
|
objects: 0,
|
||||||
|
versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 0, prev: 0 },
|
||||||
|
byLocation: {},
|
||||||
|
},
|
||||||
|
stalled: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const refSingleObj = {
|
||||||
|
objects: 2,
|
||||||
|
versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 200, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stalled: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const refSingleObjVer = {
|
||||||
|
objects: 1,
|
||||||
|
versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 100, prev: 100 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stalled: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const refMultiObjVer = {
|
||||||
|
objects: 1,
|
||||||
|
versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 200 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 100 },
|
||||||
|
locationTwo: { curr: 100, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stalled: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const refMultiObj = {
|
||||||
|
objects: 2,
|
||||||
|
versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 400, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 200, prev: 0 },
|
||||||
|
locationTwo: { curr: 200, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
stalled: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const singleSite = size => ({
|
||||||
|
'content-length': size,
|
||||||
|
'dataStoreName': 'locationOne',
|
||||||
|
'replicationInfo': {
|
||||||
|
backends: [],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const multiSite = (size, isComplete) => ({
|
||||||
|
'content-length': size,
|
||||||
|
'dataStoreName': 'locationOne',
|
||||||
|
'replicationInfo': {
|
||||||
|
backends: [{
|
||||||
|
site: 'locationTwo',
|
||||||
|
status: isComplete ? 'COMPLETED' : 'PENDING',
|
||||||
|
}],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const transientSite = (size, status, backends) => ({
|
||||||
|
'content-length': size,
|
||||||
|
'dataStoreName': 'locationOne',
|
||||||
|
'replicationInfo': { status, backends },
|
||||||
|
});
|
||||||
|
|
||||||
|
const locationConstraints = {
|
||||||
|
locationOne: { isTransient: true },
|
||||||
|
locationTwo: { isTransient: false },
|
||||||
|
};
|
||||||
|
|
||||||
|
const dataCounter = new DataCounter();
|
||||||
|
|
||||||
|
describe('DataCounter Class', () => {
|
||||||
|
it('should create a zero object', () => {
|
||||||
|
dataCounter.set(refZeroObj);
|
||||||
|
assert.deepStrictEqual(dataCounter.results(), refZeroObj);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should skip dataCounter methods if initial values are not set', () => {
|
||||||
|
const testCounter = new DataCounter();
|
||||||
|
testCounter.addObject(singleSite(100), null, NEW_OBJ);
|
||||||
|
assert.deepStrictEqual(testCounter.results(), refZeroObj);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('DateCounter::updateTransientList', () => {
|
||||||
|
afterEach(() => dataCounter.updateTransientList({}));
|
||||||
|
it('should set transient list', () => {
|
||||||
|
assert.deepStrictEqual(dataCounter.transientList, {});
|
||||||
|
dataCounter.updateTransientList(locationConstraints);
|
||||||
|
const expectedRes = { locationOne: true, locationTwo: false };
|
||||||
|
assert.deepStrictEqual(dataCounter.transientList, expectedRes);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('DataCounter::addObject', () => {
|
||||||
|
const tests = [
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, new object one site',
|
||||||
|
init: refZeroObj,
|
||||||
|
input: [singleSite(100), null, NEW_OBJ],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 100, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, new object multi site',
|
||||||
|
init: refZeroObj,
|
||||||
|
input: [multiSite(100, true), null, NEW_OBJ],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
locationTwo: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, overwrite single site',
|
||||||
|
init: refSingleObj,
|
||||||
|
input: [singleSite(100), singleSite(50), NEW_OBJ],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 2, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 250, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 250, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, overwrite multi site',
|
||||||
|
init: refMultiObj,
|
||||||
|
input: [multiSite(100, true), multiSite(50, true), NEW_OBJ],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 2, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 500, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 250, prev: 0 },
|
||||||
|
locationTwo: { curr: 250, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, new version single site',
|
||||||
|
init: refSingleObj,
|
||||||
|
input: [singleSite(100), singleSite(50), NEW_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 2, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 250, prev: 50 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 250, prev: 50 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, new version multi site',
|
||||||
|
init: refMultiObj,
|
||||||
|
input: [multiSite(100, true), multiSite(50, true), NEW_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 2, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 500, prev: 100 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 250, prev: 50 },
|
||||||
|
locationTwo: { curr: 250, prev: 50 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly ignore pending status, multi site',
|
||||||
|
init: refZeroObj,
|
||||||
|
input: [multiSite(100, false), null, NEW_OBJ],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 100, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'replication completion update in master object',
|
||||||
|
init: refSingleObj,
|
||||||
|
input: [multiSite(100, true), multiSite(100, false), UPDATE_MST],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 2, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 300, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 200, prev: 0 },
|
||||||
|
locationTwo: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'replication completion update in versioned object',
|
||||||
|
init: refSingleObjVer,
|
||||||
|
input: [multiSite(100, true), multiSite(100, false), UPDATE_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 100, prev: 200 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 100 },
|
||||||
|
locationTwo: { curr: 0, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'restoring versioned object as master',
|
||||||
|
init: refMultiObjVer,
|
||||||
|
input: [multiSite(100, true), multiSite(100, true), RESTORE],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 2, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 400, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 200, prev: 0 },
|
||||||
|
locationTwo: { curr: 200, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
];
|
||||||
|
tests.forEach(test => it(test.it, () => {
|
||||||
|
const { expectedRes, input, init } = test;
|
||||||
|
dataCounter.set(init);
|
||||||
|
dataCounter.addObject(...input);
|
||||||
|
const testResults = dataCounter.results();
|
||||||
|
Object.keys(expectedRes).forEach(key => {
|
||||||
|
if (typeof expectedRes[key] === 'object') {
|
||||||
|
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||||
|
} else {
|
||||||
|
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('DataCounter, update with transient location', () => {
|
||||||
|
before(() => dataCounter.updateTransientList(locationConstraints));
|
||||||
|
after(() => dataCounter.updateTransientList({}));
|
||||||
|
|
||||||
|
const pCurrMD = transientSite(100, 'PENDING', [
|
||||||
|
{ site: 'site1', status: 'PENDING' },
|
||||||
|
{ site: 'site2', status: 'COMPLETED' },
|
||||||
|
]);
|
||||||
|
const cCurrMD = transientSite(100, 'COMPLETED', [
|
||||||
|
{ site: 'site1', status: 'COMPLETED' },
|
||||||
|
{ site: 'site2', status: 'COMPLETED' },
|
||||||
|
]);
|
||||||
|
const prevMD = transientSite(100, 'PENDING', [
|
||||||
|
{ site: 'site1', status: 'PENDING' },
|
||||||
|
{ site: 'site2', status: 'PENDING' },
|
||||||
|
]);
|
||||||
|
const transientTest = [
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'version object, replication status = PENDING',
|
||||||
|
init: refSingleObjVer,
|
||||||
|
input: [pCurrMD, prevMD, UPDATE_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 100, prev: 200 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 100 },
|
||||||
|
site2: { curr: 0, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'version object, replication status = COMPLETED',
|
||||||
|
init: refSingleObjVer,
|
||||||
|
input: [cCurrMD, prevMD, UPDATE_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 100, prev: 200 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
site1: { curr: 0, prev: 100 },
|
||||||
|
site2: { curr: 0, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'master object, replication status = PENDING',
|
||||||
|
init: refSingleObjVer,
|
||||||
|
input: [pCurrMD, prevMD, UPDATE_MST],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 100 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 100 },
|
||||||
|
site2: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'master object, replication status = COMPLETED',
|
||||||
|
init: refSingleObjVer,
|
||||||
|
input: [cCurrMD, prevMD, UPDATE_MST],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 100 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 0, prev: 100 },
|
||||||
|
site1: { curr: 100, prev: 0 },
|
||||||
|
site2: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
transientTest.forEach(test => it(test.it, () => {
|
||||||
|
const { expectedRes, input, init } = test;
|
||||||
|
dataCounter.set(init);
|
||||||
|
dataCounter.addObject(...input);
|
||||||
|
const testResults = dataCounter.results();
|
||||||
|
Object.keys(expectedRes).forEach(key => {
|
||||||
|
if (typeof expectedRes[key] === 'object') {
|
||||||
|
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||||
|
} else {
|
||||||
|
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('DataCounter::delObject', () => {
|
||||||
|
const tests = [
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'delete master object single site',
|
||||||
|
init: refMultiObj,
|
||||||
|
input: [singleSite(100), DEL_MST],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 300, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
locationTwo: { curr: 200, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'delete master object multi site',
|
||||||
|
init: refMultiObj,
|
||||||
|
input: [multiSite(100, true), DEL_MST],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
locationTwo: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'delete versioned object single site',
|
||||||
|
init: refMultiObjVer,
|
||||||
|
input: [singleSite(100), DEL_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 100 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
locationTwo: { curr: 100, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should correctly update DataCounter, ' +
|
||||||
|
'delete versioned object multi site',
|
||||||
|
init: refMultiObjVer,
|
||||||
|
input: [multiSite(100, true), DEL_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
locationTwo: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should clamp negative values to 0, master object',
|
||||||
|
init: refMultiObjVer,
|
||||||
|
input: [multiSite(300, true), DEL_MST],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 0, versions: 1,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 0, prev: 200 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 0, prev: 100 },
|
||||||
|
locationTwo: { curr: 0, prev: 100 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
it: 'should clamp negative values to 0, versioned object',
|
||||||
|
init: refMultiObjVer,
|
||||||
|
input: [multiSite(300, true), DEL_VER],
|
||||||
|
expectedRes: {
|
||||||
|
objects: 1, versions: 0,
|
||||||
|
dataManaged: {
|
||||||
|
total: { curr: 200, prev: 0 },
|
||||||
|
byLocation: {
|
||||||
|
locationOne: { curr: 100, prev: 0 },
|
||||||
|
locationTwo: { curr: 100, prev: 0 },
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
tests.forEach(test => it(test.it, () => {
|
||||||
|
const { expectedRes, input, init } = test;
|
||||||
|
dataCounter.set(init);
|
||||||
|
dataCounter.delObject(...input);
|
||||||
|
const testResults = dataCounter.results();
|
||||||
|
Object.keys(expectedRes).forEach(key => {
|
||||||
|
if (typeof expectedRes[key] === 'object') {
|
||||||
|
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||||
|
} else {
|
||||||
|
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}));
|
||||||
|
});
|
|
@ -2,9 +2,47 @@ const assert = require('assert');
|
||||||
|
|
||||||
const MongoClientInterface = require(
|
const MongoClientInterface = require(
|
||||||
'../../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
|
'../../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
|
||||||
|
const DummyMongoDB = require('./utils/DummyMongoDB');
|
||||||
const DummyConfigObject = require('./utils/DummyConfigObject');
|
const DummyConfigObject = require('./utils/DummyConfigObject');
|
||||||
|
const DummyRequestLogger = require('./utils/DummyRequestLogger');
|
||||||
|
|
||||||
|
const log = new DummyRequestLogger();
|
||||||
const mongoTestClient = new MongoClientInterface({});
|
const mongoTestClient = new MongoClientInterface({});
|
||||||
|
mongoTestClient.db = new DummyMongoDB();
|
||||||
|
|
||||||
|
describe('MongoClientInterface, init behavior', () => {
|
||||||
|
let s3ConfigObj;
|
||||||
|
const locationConstraints = {
|
||||||
|
locationOne: { isTransient: true },
|
||||||
|
locationTwo: { isTransient: false },
|
||||||
|
};
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
s3ConfigObj = new DummyConfigObject();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should set DataCounter transientList when declaring a ' +
|
||||||
|
'new MongoClientInterface object', () => {
|
||||||
|
s3ConfigObj.setLocationConstraints(locationConstraints);
|
||||||
|
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||||
|
const expectedRes = { locationOne: true, locationTwo: false };
|
||||||
|
assert.deepStrictEqual(
|
||||||
|
mongoClient.dataCount.transientList, expectedRes);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should update DataCounter transientList if location constraints ' +
|
||||||
|
'are updated', done => {
|
||||||
|
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||||
|
assert.deepStrictEqual(mongoClient.dataCount.transientList, {});
|
||||||
|
const expectedRes = { locationOne: true, locationTwo: false };
|
||||||
|
s3ConfigObj.once('MongoClientTestDone', () => {
|
||||||
|
assert.deepStrictEqual(
|
||||||
|
mongoClient.dataCount.transientList, expectedRes);
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
s3ConfigObj.setLocationConstraints(locationConstraints);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('MongoClientInterface::_handleResults', () => {
|
describe('MongoClientInterface::_handleResults', () => {
|
||||||
it('should return zero-result', () => {
|
it('should return zero-result', () => {
|
||||||
|
@ -65,6 +103,140 @@ describe('MongoClientInterface::_handleResults', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('MongoClientInterface::_handleMongo', () => {
|
||||||
|
beforeEach(() => mongoTestClient.db.reset());
|
||||||
|
|
||||||
|
it('should return error if mongo aggregate fails', done => {
|
||||||
|
const retValues = [new Error('testError')];
|
||||||
|
mongoTestClient.db.setReturnValues(retValues);
|
||||||
|
const testCollection = mongoTestClient.db.collection('test');
|
||||||
|
mongoTestClient._handleMongo(testCollection, {}, false, log, err => {
|
||||||
|
assert(err, 'Expected error, but got success');
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return empty object if mongo aggregate has no results', done => {
|
||||||
|
const testCollection = mongoTestClient.db.collection('test');
|
||||||
|
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||||
|
assert.deepStrictEqual(res, {});
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return empty object if mongo aggregate has missing results',
|
||||||
|
done => {
|
||||||
|
const retValues = [[{
|
||||||
|
count: undefined,
|
||||||
|
data: undefined,
|
||||||
|
repData: undefined,
|
||||||
|
}]];
|
||||||
|
mongoTestClient.db.setReturnValues(retValues);
|
||||||
|
const testCollection = mongoTestClient.db.collection('test');
|
||||||
|
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||||
|
assert.deepStrictEqual(res, {});
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const testRetValues = [[{
|
||||||
|
count: [{ _id: null, count: 100 }],
|
||||||
|
data: [
|
||||||
|
{ _id: 'locationone', bytes: 1000 },
|
||||||
|
{ _id: 'locationtwo', bytes: 1000 },
|
||||||
|
],
|
||||||
|
repData: [
|
||||||
|
{ _id: 'awsbackend', bytes: 500 },
|
||||||
|
{ _id: 'azurebackend', bytes: 500 },
|
||||||
|
{ _id: 'gcpbackend', bytes: 500 },
|
||||||
|
],
|
||||||
|
compData: [
|
||||||
|
{ _id: 'locationone', bytes: 500 },
|
||||||
|
{ _id: 'locationtwo', bytes: 500 },
|
||||||
|
],
|
||||||
|
}]];
|
||||||
|
|
||||||
|
it('should return correct results, transient false', done => {
|
||||||
|
mongoTestClient.db.setReturnValues(testRetValues);
|
||||||
|
const testCollection = mongoTestClient.db.collection('test');
|
||||||
|
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||||
|
assert.deepStrictEqual(res, {
|
||||||
|
count: 100,
|
||||||
|
data: {
|
||||||
|
locationone: 1000,
|
||||||
|
locationtwo: 1000,
|
||||||
|
awsbackend: 500,
|
||||||
|
azurebackend: 500,
|
||||||
|
gcpbackend: 500,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return correct results, transient true', done => {
|
||||||
|
mongoTestClient.db.setReturnValues(testRetValues);
|
||||||
|
const testCollection = mongoTestClient.db.collection('test');
|
||||||
|
mongoTestClient._handleMongo(testCollection, {}, true, log,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||||
|
assert.deepStrictEqual(res, {
|
||||||
|
count: 100,
|
||||||
|
data: {
|
||||||
|
locationone: 500,
|
||||||
|
locationtwo: 500,
|
||||||
|
awsbackend: 500,
|
||||||
|
azurebackend: 500,
|
||||||
|
gcpbackend: 500,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
const testRetValuesNeg = [[{
|
||||||
|
count: [{ _id: null, count: 100 }],
|
||||||
|
data: [
|
||||||
|
{ _id: 'locationone', bytes: 100 },
|
||||||
|
{ _id: 'locationtwo', bytes: 100 },
|
||||||
|
],
|
||||||
|
repData: [
|
||||||
|
{ _id: 'awsbackend', bytes: 500 },
|
||||||
|
{ _id: 'azurebackend', bytes: 500 },
|
||||||
|
{ _id: 'gcpbackend', bytes: 500 },
|
||||||
|
],
|
||||||
|
compData: [
|
||||||
|
{ _id: 'locationone', bytes: 500 },
|
||||||
|
{ _id: 'locationtwo', bytes: 500 },
|
||||||
|
],
|
||||||
|
}]];
|
||||||
|
it('should return clamp negative values to 0', done => {
|
||||||
|
mongoTestClient.db.setReturnValues(testRetValuesNeg);
|
||||||
|
const testCollection = mongoTestClient.db.collection('test');
|
||||||
|
mongoTestClient._handleMongo(testCollection, {}, true, log,
|
||||||
|
(err, res) => {
|
||||||
|
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||||
|
assert.deepStrictEqual(res, {
|
||||||
|
count: 100,
|
||||||
|
data: {
|
||||||
|
locationone: 0,
|
||||||
|
locationtwo: 0,
|
||||||
|
awsbackend: 500,
|
||||||
|
azurebackend: 500,
|
||||||
|
gcpbackend: 500,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('MongoClientInterface, misc', () => {
|
describe('MongoClientInterface, misc', () => {
|
||||||
let s3ConfigObj;
|
let s3ConfigObj;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,103 @@
|
||||||
|
const testError = new Error('test error');
|
||||||
|
|
||||||
|
class DummyCollection {
|
||||||
|
constructor(name, isFail) {
|
||||||
|
this.s = {
|
||||||
|
name,
|
||||||
|
};
|
||||||
|
this.fail = isFail;
|
||||||
|
this.retQueue = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
setReturnValues(retArray) {
|
||||||
|
this.retQueue.push(...retArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregate() {
|
||||||
|
return {
|
||||||
|
toArray: cb => {
|
||||||
|
if (this.retQueue.length <= 0) {
|
||||||
|
return cb(null, []);
|
||||||
|
}
|
||||||
|
const retVal = this.retQueue[0];
|
||||||
|
this.retQueue = this.retQueue.slice(1);
|
||||||
|
if (retVal instanceof Error) {
|
||||||
|
return cb(retVal);
|
||||||
|
}
|
||||||
|
return cb(null, retVal);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
bulkWrite(cmds, opt, cb) {
|
||||||
|
process.stdout.write('mock mongodb.bulkWrite call\n');
|
||||||
|
if (this.fail) {
|
||||||
|
return cb(testError);
|
||||||
|
}
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
|
||||||
|
update(target, doc, opt, cb) {
|
||||||
|
process.stdout.write('mock mongodb.update call\n');
|
||||||
|
if (this.fail) {
|
||||||
|
return cb(testError);
|
||||||
|
}
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
|
||||||
|
find() {
|
||||||
|
return {
|
||||||
|
toArray: cb => {
|
||||||
|
if (this.retQueue.length <= 0) {
|
||||||
|
return cb(null, []);
|
||||||
|
}
|
||||||
|
const retVal = this.retQueue[0];
|
||||||
|
this.retQueue = this.retQueue.slice(1);
|
||||||
|
if (retVal instanceof Error) {
|
||||||
|
return cb(retVal);
|
||||||
|
}
|
||||||
|
return cb(null, retVal);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
findOne(query, opt, cb) {
|
||||||
|
if (typeof opt === 'function' && cb === undefined) {
|
||||||
|
// eslint-disable-next-line no-param-reassign
|
||||||
|
cb = opt;
|
||||||
|
}
|
||||||
|
if (this.retQueue.length <= 0) {
|
||||||
|
return cb(null);
|
||||||
|
}
|
||||||
|
const retVal = this.retQueue[0];
|
||||||
|
this.retQueue = this.retQueue.slice(1);
|
||||||
|
if (retVal instanceof Error) {
|
||||||
|
return cb(retVal);
|
||||||
|
}
|
||||||
|
return cb(null, retVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class DummyMongoDB {
|
||||||
|
contructor() {
|
||||||
|
this.fail = false;
|
||||||
|
this.returnQueue = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
reset() {
|
||||||
|
this.fail = false;
|
||||||
|
this.returnQueue = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
setReturnValues(retValues) {
|
||||||
|
this.returnQueue.push(...retValues);
|
||||||
|
}
|
||||||
|
|
||||||
|
collection(name) {
|
||||||
|
const c = new DummyCollection(name, this.fail);
|
||||||
|
c.setReturnValues(this.returnQueue);
|
||||||
|
return c;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = DummyMongoDB;
|
Loading…
Reference in New Issue