Compare commits
6 Commits
developmen
...
wip/countI
Author | SHA1 | Date |
---|---|---|
Alexander Chan | 2f8af6352c | |
Alexander Chan | eb79743c44 | |
Alexander Chan | a9bedee48b | |
Alexander Chan | 239060765c | |
Alexander Chan | f7d0406d10 | |
Alexander Chan | c3389da37e |
|
@ -11,7 +11,6 @@
|
|||
*/
|
||||
const async = require('async');
|
||||
|
||||
const { EventEmitter } = require('events');
|
||||
const constants = require('../../../constants');
|
||||
|
||||
const { reshapeExceptionError } = require('../../../errorUtils');
|
||||
|
@ -27,7 +26,6 @@ const listAlgos = require('../../../algos/list/exportAlgos');
|
|||
|
||||
const MongoReadStream = require('./readStream');
|
||||
const MongoUtils = require('./utils');
|
||||
const { DataCounter } = require('./DataCounter');
|
||||
const Skip = require('../../../algos/list/skip');
|
||||
|
||||
const USERSBUCKET = '__usersbucket';
|
||||
|
@ -40,6 +38,7 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
|
|||
const CONNECT_TIMEOUT_MS = 5000;
|
||||
// MongoDB default
|
||||
const SOCKET_TIMEOUT_MS = 360000;
|
||||
const CONCURRENT_CURSORS = 10;
|
||||
|
||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||
|
||||
|
@ -92,7 +91,7 @@ function generatePHDVersion(versionId) {
|
|||
class MongoClientInterface {
|
||||
constructor(params) {
|
||||
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
|
||||
database, logger, replicationGroupId, authCredentials, config,
|
||||
database, logger, replicationGroupId, authCredentials,
|
||||
isLocationTransient } = params;
|
||||
const cred = MongoUtils.credPrefix(authCredentials);
|
||||
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
|
||||
|
@ -104,20 +103,7 @@ class MongoClientInterface {
|
|||
this.path = path;
|
||||
this.replicationGroupId = replicationGroupId;
|
||||
this.database = database;
|
||||
this.dataCount = new DataCounter();
|
||||
this.isLocationTransient = isLocationTransient;
|
||||
|
||||
if (config && config instanceof EventEmitter) {
|
||||
this.config = config;
|
||||
this.config.on('location-constraints-update', () => {
|
||||
this.dataCount
|
||||
.updateTransientList(this.config.locationConstraints);
|
||||
if (this.config.isTest) {
|
||||
this.config.emit('MongoClientTestDone');
|
||||
}
|
||||
});
|
||||
this.dataCount.updateTransientList(this.config.locationConstraints);
|
||||
}
|
||||
}
|
||||
|
||||
setup(cb) {
|
||||
|
@ -136,6 +122,10 @@ class MongoClientInterface {
|
|||
const socketTimeoutMS = Number.parseInt(
|
||||
process.env.MONGO_SOCKET_TIMEOUT_MS, 10) || SOCKET_TIMEOUT_MS;
|
||||
const options = { connectTimeoutMS, socketTimeoutMS };
|
||||
if (process.env.MONGO_POOL_SIZE &&
|
||||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||
}
|
||||
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
||||
if (err) {
|
||||
this.logger.error('error connecting to mongodb',
|
||||
|
@ -1149,7 +1139,7 @@ class MongoClientInterface {
|
|||
* get bucket related information for count items, used by cloudserver
|
||||
* and s3utils
|
||||
*/
|
||||
_getBucketInfos(log, cb) {
|
||||
getBucketInfos(log, cb) {
|
||||
let bucketCount = 0;
|
||||
const bucketInfos = [];
|
||||
|
||||
|
@ -1198,7 +1188,7 @@ class MongoClientInterface {
|
|||
}
|
||||
|
||||
countItems(log, cb) {
|
||||
this._getBucketInfos(log, (err, res) => {
|
||||
this.getBucketInfos(log, (err, res) => {
|
||||
if (err) {
|
||||
log.error('error getting bucket info', {
|
||||
method: 'countItems',
|
||||
|
@ -1230,6 +1220,25 @@ class MongoClientInterface {
|
|||
});
|
||||
}
|
||||
|
||||
consolidateData(store, dataManaged) {
|
||||
if (dataManaged && dataManaged.locations && dataManaged.total) {
|
||||
const locations = dataManaged.locations;
|
||||
store.dataManaged.total.curr += dataManaged.total.curr;
|
||||
store.dataManaged.total.prev += dataManaged.total.prev;
|
||||
Object.keys(locations).forEach(site => {
|
||||
if (!store.dataManaged.byLocation[site]) {
|
||||
store.dataManaged.byLocation[site] =
|
||||
Object.assign({}, locations[site]);
|
||||
} else {
|
||||
store.dataManaged.byLocation[site].curr +=
|
||||
locations[site].curr;
|
||||
store.dataManaged.byLocation[site].prev +=
|
||||
locations[site].prev;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
scanItemCount(log, cb) {
|
||||
const store = {
|
||||
objects: 0,
|
||||
|
@ -1243,26 +1252,10 @@ class MongoClientInterface {
|
|||
stalled: 0,
|
||||
};
|
||||
|
||||
const consolidateData = dataManaged => {
|
||||
if (dataManaged && dataManaged.locations && dataManaged.total) {
|
||||
const locations = dataManaged.locations;
|
||||
store.dataManaged.total.curr += dataManaged.total.curr;
|
||||
store.dataManaged.total.prev += dataManaged.total.prev;
|
||||
Object.keys(locations).forEach(site => {
|
||||
if (!store.dataManaged.byLocation[site]) {
|
||||
store.dataManaged.byLocation[site] =
|
||||
Object.assign({}, locations[site]);
|
||||
} else {
|
||||
store.dataManaged.byLocation[site].curr +=
|
||||
locations[site].curr;
|
||||
store.dataManaged.byLocation[site].prev +=
|
||||
locations[site].prev;
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
const consolidateData = (dataManaged) =>
|
||||
this.consolidateData(store, dataManaged);
|
||||
|
||||
this._getBucketInfos(log, (err, res) => {
|
||||
this.getBucketInfos(log, (err, res) => {
|
||||
if (err) {
|
||||
log.error('error getting bucket info', {
|
||||
method: 'scanItemCount',
|
||||
|
@ -1283,42 +1276,50 @@ class MongoClientInterface {
|
|||
store.buckets = bucketCount;
|
||||
store.bucketList = retBucketInfos;
|
||||
|
||||
return async.eachLimit(bucketInfos, 10, (bucketInfo, done) => {
|
||||
async.waterfall([
|
||||
next => this._getIsTransient(bucketInfo, log, next),
|
||||
(isTransient, next) => {
|
||||
const bucketName = bucketInfo.getName();
|
||||
this.getObjectMDStats(bucketName, bucketInfo,
|
||||
isTransient, log, next);
|
||||
},
|
||||
], (err, results) => {
|
||||
let concurrentCursors = CONCURRENT_CURSORS;
|
||||
if (process.env.CONCURRENT_CURSORS &&
|
||||
!Number.isNaN(process.env.CONCURRENT_CURSORS)) {
|
||||
concurrentCursors = Number
|
||||
.parseInt(process.env.CONCURRENT_CURSORS, 10);
|
||||
}
|
||||
|
||||
return async.eachLimit(bucketInfos, concurrentCursors,
|
||||
(bucketInfo, done) => {
|
||||
async.waterfall([
|
||||
next => this._getIsTransient(bucketInfo, log, next),
|
||||
(isTransient, next) => {
|
||||
const bucketName = bucketInfo.getName();
|
||||
this.getObjectMDStats(bucketName, bucketInfo,
|
||||
isTransient, log, next);
|
||||
},
|
||||
], (err, results) => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
if (results.dataManaged) {
|
||||
store.objects += results.objects;
|
||||
store.versions += results.versions;
|
||||
store.stalled += results.stalled;
|
||||
consolidateData(results.dataManaged);
|
||||
}
|
||||
return done();
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
if (results.dataManaged) {
|
||||
store.objects += results.objects;
|
||||
store.versions += results.versions;
|
||||
store.stalled += results.stalled;
|
||||
consolidateData(results.dataManaged);
|
||||
}
|
||||
return done();
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
// save to infostore
|
||||
return this.updateCountItems(store, log, err => {
|
||||
if (err) {
|
||||
log.error('error saving count items in mongo', {
|
||||
method: 'scanItemCount',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, store);
|
||||
// save to infostore
|
||||
return this.updateCountItems(store, log, err => {
|
||||
if (err) {
|
||||
log.error('error saving count items in mongo', {
|
||||
method: 'scanItemCount',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, store);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
@ -1330,7 +1331,6 @@ class MongoClientInterface {
|
|||
this.isLocationTransient(locConstraint, log, cb);
|
||||
return;
|
||||
}
|
||||
|
||||
this._pensieveLocationIsTransient(locConstraint, log, cb);
|
||||
}
|
||||
|
||||
|
@ -1415,211 +1415,136 @@ class MongoClientInterface {
|
|||
};
|
||||
}
|
||||
|
||||
_handleCount(entry) {
|
||||
return entry && entry.count > 0 ? entry.count : 0;
|
||||
}
|
||||
|
||||
_handleEntries(entries) {
|
||||
/**
|
||||
* @param{object} entry -
|
||||
* @param{string} entry._id -
|
||||
* @param{object} entry.value -
|
||||
* @param{boolean} isTransient -
|
||||
* @returns{object.<string, number>} results -
|
||||
*/
|
||||
_processEntryData(entry, isTransient) {
|
||||
const results = {};
|
||||
if (entries) {
|
||||
entries.forEach(entry => {
|
||||
results[entry._id] = entry.bytes;
|
||||
});
|
||||
if (!isTransient ||
|
||||
entry.value.replicationInfo.status !== 'COMPLETED') {
|
||||
if (results[entry.value.dataStoreName]) {
|
||||
results[entry.value.dataStoreName] +=
|
||||
entry.value['content-length'];
|
||||
} else {
|
||||
results[entry.value.dataStoreName] =
|
||||
entry.value['content-length'];
|
||||
}
|
||||
} else {
|
||||
if (!results[entry.value.dataStoreName]) {
|
||||
results[entry.value.dataStoreName] = 0;
|
||||
}
|
||||
}
|
||||
entry.value.replicationInfo.backends.forEach(rep => {
|
||||
if (rep.status === 'COMPLETED') {
|
||||
if (results[rep.site]) {
|
||||
results[rep.site] += entry.value['content-length'];
|
||||
} else {
|
||||
results[rep.site] = entry.value['content-length'];
|
||||
}
|
||||
}
|
||||
});
|
||||
return results;
|
||||
}
|
||||
|
||||
_handleMongo(c, filter, isTransient, log, cb) {
|
||||
const reducedFields = {
|
||||
'_id': 1,
|
||||
'value.versionId': 1,
|
||||
'value.replicationInfo.status': 1,
|
||||
'value.replicationInfo.backends': 1,
|
||||
'value.content-length': 1,
|
||||
'value.dataStoreName': 1,
|
||||
};
|
||||
|
||||
const aggCount = [
|
||||
{ $project: { _id: 1 } },
|
||||
{ $group: { _id: null, count: { $sum: 1 } } },
|
||||
];
|
||||
|
||||
const aggData = [
|
||||
{ $project: {
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
} },
|
||||
{ $group: {
|
||||
_id: '$value.dataStoreName',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
} },
|
||||
];
|
||||
|
||||
const aggRepData = [
|
||||
{ $project: {
|
||||
'value.replicationInfo.backends': 1,
|
||||
'value.content-length': 1,
|
||||
} },
|
||||
{ $unwind: '$value.replicationInfo.backends' },
|
||||
{ $match: {
|
||||
'value.replicationInfo.backends.status': { $eq: 'COMPLETED' },
|
||||
} },
|
||||
{ $group: {
|
||||
_id: '$value.replicationInfo.backends.site',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
} },
|
||||
];
|
||||
|
||||
const aggCompleted = [
|
||||
{ $project: {
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
'inComplete': {
|
||||
$eq: ['$value.replicationInfo.status', 'COMPLETED'],
|
||||
},
|
||||
} },
|
||||
{ $match: { inComplete: true } },
|
||||
{ $group: {
|
||||
_id: '$value.dataStoreName',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
} },
|
||||
];
|
||||
|
||||
return c.aggregate([
|
||||
{ $project: reducedFields },
|
||||
{ $match: filter },
|
||||
{ $facet: {
|
||||
count: aggCount,
|
||||
data: aggData,
|
||||
repData: aggRepData,
|
||||
compData: aggCompleted,
|
||||
} },
|
||||
]).toArray((err, res) => {
|
||||
if (err) {
|
||||
log.error('Error when processing mongo entries', {
|
||||
method: '_handleMongo',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
if (!res || res.length < 1) {
|
||||
log.debug('aggregate returned empty results', {
|
||||
method: '_handleMongo',
|
||||
});
|
||||
return cb(null, {});
|
||||
}
|
||||
const agg = res[0];
|
||||
if (!agg || !agg.count || !agg.data || !agg.repData) {
|
||||
log.debug('missing field in aggregate results', {
|
||||
method: '_handleMongo',
|
||||
});
|
||||
return cb(null, {});
|
||||
}
|
||||
const retResult = {
|
||||
count: this._handleCount(agg.count[0] || {}),
|
||||
data: this._handleEntries(agg.data),
|
||||
};
|
||||
const repDataEntries = this._handleEntries(agg.repData);
|
||||
Object.keys(repDataEntries).forEach(site => {
|
||||
if (!retResult.data[site]) {
|
||||
retResult.data[site] = 0;
|
||||
}
|
||||
retResult.data[site] += repDataEntries[site];
|
||||
});
|
||||
if (isTransient && agg.compData) {
|
||||
const compDataEntries = this._handleEntries(agg.compData);
|
||||
Object.keys(compDataEntries).forEach(site => {
|
||||
if (retResult.data[site]) {
|
||||
retResult.data[site] -= compDataEntries[site];
|
||||
retResult.data[site] =
|
||||
Math.max(0, retResult.data[site]);
|
||||
}
|
||||
});
|
||||
}
|
||||
return cb(null, retResult);
|
||||
});
|
||||
}
|
||||
|
||||
_getStalled(c, cmpDate, log, cb) {
|
||||
const reducedFields = {
|
||||
'_id': {
|
||||
id: '$_id',
|
||||
status: '$value.replicationInfo.status',
|
||||
},
|
||||
'value.last-modified': 1,
|
||||
};
|
||||
return c.aggregate([
|
||||
{ $project: reducedFields },
|
||||
{ $match: {
|
||||
'_id.id': { $regex: /\0/ },
|
||||
'_id.status': { $eq: 'PENDING' },
|
||||
} },
|
||||
]).toArray((err, res) => {
|
||||
if (err) {
|
||||
log.debug('Unable to retrieve stalled entries', {
|
||||
error: err,
|
||||
});
|
||||
return cb(null);
|
||||
}
|
||||
const count = res.filter(data => {
|
||||
if (!data || typeof data !== 'object' ||
|
||||
!data.value || typeof data.value !== 'object') {
|
||||
return false;
|
||||
}
|
||||
const time = data.value['last-modified'] || null;
|
||||
if (isNaN(Date.parse(time))) {
|
||||
return false;
|
||||
}
|
||||
const testDate = new Date(time);
|
||||
return testDate <= cmpDate;
|
||||
}).length;
|
||||
return cb(null, count);
|
||||
});
|
||||
/**
|
||||
* @param{object} entry -
|
||||
* @param{string} entry._id -
|
||||
* @param{object} entry.value -
|
||||
* @param{Date} cmpDate -
|
||||
* @returns{boolean} stalled -
|
||||
*/
|
||||
_processEntryStalled(entry, cmpDate) {
|
||||
if (entry.value.replicationInfo.status === 'PENDING') {
|
||||
return false;
|
||||
}
|
||||
const lastModified = Date.parse(entry.value['last-modified'] || null);
|
||||
if (isNaN(lastModified) || new Date(lastModified) > cmpDate) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* scan and process a single collection (bucket)
|
||||
*/
|
||||
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
|
||||
const c = this.getCollection(bucketName);
|
||||
const mstFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: true },
|
||||
const cursor = c.find({}, {
|
||||
sort: { _id: 1 },
|
||||
projection: {
|
||||
'_id': 1,
|
||||
'value.last-modified': 1,
|
||||
'value.replicationInfo': 1,
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
'value.versionId': 1,
|
||||
},
|
||||
});
|
||||
const collRes = {
|
||||
masterCount: 0,
|
||||
masterData: {},
|
||||
nullCount: 0,
|
||||
nullData: {},
|
||||
versionCount: 0,
|
||||
versionData: {},
|
||||
};
|
||||
const verFilter = { _id: { $regex: /\0/ } };
|
||||
const nullFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: false },
|
||||
};
|
||||
|
||||
let stalledCount = 0;
|
||||
const cmpDate = new Date();
|
||||
cmpDate.setHours(cmpDate.getHours() - 1);
|
||||
|
||||
async.parallel({
|
||||
version: done =>
|
||||
this._handleMongo(c, verFilter, isTransient, log, done),
|
||||
null: done =>
|
||||
this._handleMongo(c, nullFilter, isTransient, log, done),
|
||||
master: done =>
|
||||
this._handleMongo(c, mstFilter, isTransient, log, done),
|
||||
stalled: done =>
|
||||
this._getStalled(c, cmpDate, log, done),
|
||||
}, (err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
cursor.forEach(
|
||||
res => {
|
||||
const data = this._processEntryData(res, isTransient);
|
||||
let targetCount;
|
||||
let targetData;
|
||||
if (res._id.indexOf('\0') !== -1) {
|
||||
// versioned item
|
||||
targetCount = 'versionCount';
|
||||
targetData = 'versionData';
|
||||
|
||||
if (res.value.replicationInfo.backends.length > 0 &&
|
||||
this._processEntryStalled(res, cmpDate)) {
|
||||
stalledCount++;
|
||||
}
|
||||
} else if (!!res.value.versionId) {
|
||||
// master version
|
||||
targetCount = 'masterCount';
|
||||
targetData = 'masterData';
|
||||
} else {
|
||||
// null version
|
||||
targetCount = 'nullCount';
|
||||
targetData = 'nullData';
|
||||
}
|
||||
collRes[targetCount]++;
|
||||
Object.keys(data).forEach(site => {
|
||||
if (collRes[targetData][site]) {
|
||||
collRes[targetData][site] += data[site];
|
||||
} else {
|
||||
collRes[targetData][site] = data[site];
|
||||
}
|
||||
});
|
||||
},
|
||||
err => {
|
||||
if (err) {
|
||||
log.error('Error when processing mongo entries', {
|
||||
method: 'getObjectMDStats',
|
||||
error: err,
|
||||
});
|
||||
return callback(err);
|
||||
}
|
||||
const bucketStatus = bucketInfo.getVersioningConfiguration();
|
||||
const isVer = (bucketStatus &&
|
||||
(bucketStatus.Status === 'Enabled' ||
|
||||
bucketStatus.Status === 'Suspended'));
|
||||
const retResult = this._handleResults(collRes, isVer);
|
||||
retResult.stalled = stalledCount;
|
||||
return callback(null, retResult);
|
||||
}
|
||||
const resObj = {
|
||||
masterCount: res.master.count || 0,
|
||||
masterData: res.master.data || {},
|
||||
nullCount: res.null.count || 0,
|
||||
nullData: res.null.data || {},
|
||||
versionCount: res.version.count || 0,
|
||||
versionData: res.version.data || {},
|
||||
};
|
||||
const bucketStatus = bucketInfo.getVersioningConfiguration();
|
||||
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
|
||||
bucketStatus.Status === 'Suspended'));
|
||||
const retResult = this._handleResults(resObj, isVer);
|
||||
retResult.stalled = res.stalled || 0;
|
||||
return callback(null, retResult);
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
getIngestionBuckets(log, cb) {
|
||||
|
@ -1748,3 +1673,4 @@ class MongoClientInterface {
|
|||
}
|
||||
|
||||
module.exports = MongoClientInterface;
|
||||
|
||||
|
|
|
@ -1,11 +1,14 @@
|
|||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const assert = require('assert');
|
||||
const werelogs = require('werelogs');
|
||||
const { MongoMemoryReplSet } = require('mongodb-memory-server');
|
||||
|
||||
const errors = require('../../../../lib/errors');
|
||||
const logger = new werelogs.Logger('MongoClientInterface', 'debug', 'debug');
|
||||
const BucketInfo = require('../../../../lib/models/BucketInfo');
|
||||
const ObjectMD = require('../../../../lib/models/ObjectMD');
|
||||
const MongoClientInterface =
|
||||
require('../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
|
||||
const MetadataWrapper =
|
||||
require('../../../../lib/storage/metadata/MetadataWrapper');
|
||||
|
||||
|
@ -314,3 +317,331 @@ describe('MongoClientInterface', () => {
|
|||
}));
|
||||
});
|
||||
});
|
||||
|
||||
function isTransientFn(locationName, log, callback) {
|
||||
if (locationName.startsWith('transient')) {
|
||||
return callback(null, true);
|
||||
}
|
||||
return callback(null, false);
|
||||
}
|
||||
|
||||
function createBucketMD(locationName, bucketName, isVersioned, creationDate) {
|
||||
const bucketObj = {
|
||||
'_acl': {
|
||||
'Canned': 'private',
|
||||
'FULL_CONTROL': [],
|
||||
'WRITE': [],
|
||||
'WRITE_ACP': [],
|
||||
'READ': [],
|
||||
'READ_ACP': []
|
||||
},
|
||||
'_name': bucketName,
|
||||
'_owner': 'testOwner',
|
||||
'_ownerDisplayName':'testOwnerDisplayName',
|
||||
'_creationDate': creationDate,
|
||||
'_mdBucketModelVersion': 10,
|
||||
'_transient': false,
|
||||
'_deleted': false,
|
||||
'_serverSideEncryption': null,
|
||||
'_versioningConfiguration': isVersioned === null
|
||||
? null
|
||||
: { Status: isVersioned ? 'Enabled' : 'Suspended' },
|
||||
'_locationConstraint': locationName,
|
||||
'_readLocationConstraint': null,
|
||||
'_cors': null,
|
||||
'_replicationConfiguration': null,
|
||||
'_lifecycleConfiguration': null,
|
||||
'_uid': '',
|
||||
'_isNFS': null,
|
||||
'ingestion': null
|
||||
};
|
||||
return BucketInfo.fromObj(bucketObj);
|
||||
}
|
||||
|
||||
function createObjectMD(params) {
|
||||
const {
|
||||
locationName,
|
||||
objectKey,
|
||||
replicationInfo,
|
||||
lastModified,
|
||||
size,
|
||||
versionId,
|
||||
} = params;
|
||||
return new ObjectMD()
|
||||
.setLastModified(lastModified)
|
||||
.setContentLength(size)
|
||||
.setDataStoreName(locationName)
|
||||
.setReplicationInfo(replicationInfo)
|
||||
.setVersionId(versionId)
|
||||
.setKey(objectKey);
|
||||
}
|
||||
|
||||
function createReplicationInfo(backends, status) {
|
||||
return {
|
||||
status,
|
||||
backends,
|
||||
content: [],
|
||||
destination: '',
|
||||
storageClass: '',
|
||||
role: '',
|
||||
storageType: '',
|
||||
dataStoreVersionId: '',
|
||||
isNFS: null,
|
||||
};
|
||||
}
|
||||
|
||||
describe.only('MongoClientInterface::scanItemCount', () => {
|
||||
let mongoc;
|
||||
before(done => {
|
||||
mongoserver.waitUntilRunning().then(() => {
|
||||
const opts = {
|
||||
isLocationTransient: isTransientFn,
|
||||
replicaSetHosts: 'localhost:27018',
|
||||
writeConcern: 'majority',
|
||||
replicaSet: 'rs0',
|
||||
readPreference: 'primary',
|
||||
database: dbName,
|
||||
logger
|
||||
};
|
||||
mongoc = new MongoClientInterface(opts);
|
||||
mongoc.setup(done);
|
||||
});
|
||||
});
|
||||
|
||||
after(done => {
|
||||
async.series([
|
||||
next => mongoc.close(next),
|
||||
next => mongoserver.stop()
|
||||
.then(() => next())
|
||||
.catch(next),
|
||||
], done);
|
||||
});
|
||||
|
||||
it('should return correct count with transient location (COMPLETED)', done => {
|
||||
const bucketName = 'test-bucket';
|
||||
const location = 'transient-loc-1';
|
||||
const createDate = new Date().toJSON();
|
||||
const objectName = 'test-object';
|
||||
const versionId = '00001';
|
||||
const params = {
|
||||
locationName: location,
|
||||
objectKey: objectName,
|
||||
replicationInfo: createReplicationInfo([
|
||||
{site: 'loc1', status: 'COMPLETED'},
|
||||
{site: 'loc2', status: 'COMPLETED'},
|
||||
], 'COMPLETED'),
|
||||
size: 1,
|
||||
versionId,
|
||||
lastModified: new Date().toJSON(),
|
||||
};
|
||||
const expected = {
|
||||
total: { curr: 2, prev: 0 },
|
||||
byLocation: {
|
||||
'transient-loc-1': { curr: 0, prev: 0 },
|
||||
'loc1': { curr: 1, prev: 0 },
|
||||
'loc2': { curr: 1, prev: 0 },
|
||||
},
|
||||
};
|
||||
async.series([
|
||||
next => mongoc.createBucket(
|
||||
bucketName,
|
||||
createBucketMD(location, bucketName, false, createDate),
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: false },
|
||||
logger,
|
||||
next),
|
||||
next => scanAndTest(mongoc, expected, next),
|
||||
], done);
|
||||
});
|
||||
|
||||
it('should return correct count with transient location (PENDING)', done => {
|
||||
const bucketName = 'test-bucket';
|
||||
const location = 'transient-loc-1';
|
||||
const createDate = new Date().toJSON();
|
||||
const objectName = 'test-object';
|
||||
const versionId = '00001';
|
||||
const params = {
|
||||
locationName: location,
|
||||
objectKey: objectName,
|
||||
replicationInfo: createReplicationInfo([
|
||||
{site: 'loc1', status: 'COMPLETED'},
|
||||
{site: 'loc2', status: 'COMPLETED'},
|
||||
], 'PENDING'),
|
||||
size: 1,
|
||||
versionId,
|
||||
lastModified: new Date().toJSON(),
|
||||
};
|
||||
const expected = {
|
||||
total: { curr: 3, prev: 0 },
|
||||
byLocation: {
|
||||
'transient-loc-1': { curr: 1, prev: 0 },
|
||||
'loc1': { curr: 1, prev: 0 },
|
||||
'loc2': { curr: 1, prev: 0 },
|
||||
},
|
||||
};
|
||||
async.series([
|
||||
next => mongoc.createBucket(
|
||||
bucketName,
|
||||
createBucketMD(location, bucketName, false, createDate),
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: false },
|
||||
logger,
|
||||
next),
|
||||
next => scanAndTest(mongoc, expected, next),
|
||||
], done);
|
||||
});
|
||||
|
||||
it('should correctly count null objects', done => {
|
||||
const bucketName = 'test-bucket';
|
||||
const location = 'test-loc-1';
|
||||
const createDate = new Date().toJSON();
|
||||
const objectName = 'test-object';
|
||||
const versionId = '00001';
|
||||
const params = {
|
||||
locationName: location,
|
||||
objectKey: objectName,
|
||||
replicationInfo: createReplicationInfo([], ''),
|
||||
size: 1,
|
||||
versionId,
|
||||
lastModified: new Date().toJSON(),
|
||||
};
|
||||
const expected = {
|
||||
total: { curr: 1, prev: 0 },
|
||||
byLocation: {
|
||||
'test-loc-1': { curr: 1, prev: 0 },
|
||||
},
|
||||
};
|
||||
async.series([
|
||||
next => mongoc.createBucket(
|
||||
bucketName,
|
||||
createBucketMD(location, bucketName, false, createDate),
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: false },
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.createBucket(
|
||||
bucketName,
|
||||
createBucketMD(location, bucketName, true, createDate),
|
||||
logger,
|
||||
next),
|
||||
next => scanAndTest(mongoc, expected, next),
|
||||
], done);
|
||||
});
|
||||
|
||||
it('should correctly count versioned objects', done => {
|
||||
const bucketName = 'test-bucket';
|
||||
const location = 'test-loc-1';
|
||||
const createDate = new Date().toJSON();
|
||||
const objectName = 'test-object';
|
||||
const versionId = '00001';
|
||||
const params = {
|
||||
locationName: location,
|
||||
objectKey: objectName,
|
||||
replicationInfo: createReplicationInfo([], ''),
|
||||
size: 1,
|
||||
versionId,
|
||||
lastModified: new Date().toJSON(),
|
||||
};
|
||||
const expected = {
|
||||
total: { curr: 1, prev: 1 },
|
||||
byLocation: {
|
||||
'test-loc-1': { curr: 1, prev: 1 },
|
||||
},
|
||||
};
|
||||
async.series([
|
||||
next => mongoc.createBucket(
|
||||
bucketName,
|
||||
createBucketMD(location, bucketName, true, createDate),
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: true, versionId: '00001'},
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: true, versionId: '00002' },
|
||||
logger,
|
||||
next),
|
||||
next => scanAndTest(mongoc, expected, next),
|
||||
], done);
|
||||
});
|
||||
|
||||
it('should correctly count stalled objects', done => {
|
||||
const bucketName = 'test-bucket';
|
||||
const location = 'test-loc-1';
|
||||
const createDate = new Date().toJSON();
|
||||
const objectName = 'test-object';
|
||||
const versionId = '00001';
|
||||
const params = {
|
||||
locationName: location,
|
||||
objectKey: objectName,
|
||||
replicationInfo: createReplicationInfo([], ''),
|
||||
size: 1,
|
||||
versionId,
|
||||
lastModified: new Date().toJSON(),
|
||||
};
|
||||
const expected = {
|
||||
total: { curr: 1, prev: 1 },
|
||||
byLocation: {
|
||||
'test-loc-1': { curr: 1, prev: 1 },
|
||||
},
|
||||
};
|
||||
async.series([
|
||||
next => mongoc.createBucket(
|
||||
bucketName,
|
||||
createBucketMD(location, bucketName, true, createDate),
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: true, versionId: '00001'},
|
||||
logger,
|
||||
next),
|
||||
next => mongoc.putObject(
|
||||
bucketName,
|
||||
objectName,
|
||||
createObjectMD(params).getValue(),
|
||||
{ versioning: true, versionId: '00002' },
|
||||
logger,
|
||||
next),
|
||||
next => scanAndTest(mongoc, expected, next),
|
||||
], done);
|
||||
});
|
||||
});
|
||||
|
||||
function scanAndTest(mongoc, expected, callback) {
|
||||
async.series([
|
||||
next => mongoc.scanItemCount(logger, next),
|
||||
next => mongoc.countItems(logger, (err, res) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
console.log(res)
|
||||
assert.deepStrictEqual(res.dataManaged, expected);
|
||||
return next();
|
||||
}),
|
||||
], callback);
|
||||
}
|
||||
|
|
|
@ -1,507 +0,0 @@
|
|||
const assert = require('assert');
|
||||
|
||||
const {
|
||||
NEW_OBJ,
|
||||
NEW_VER,
|
||||
UPDATE_VER,
|
||||
UPDATE_MST,
|
||||
RESTORE,
|
||||
DEL_VER,
|
||||
DEL_MST,
|
||||
DataCounter,
|
||||
} = require('../../../../../lib/storage/metadata/mongoclient/DataCounter');
|
||||
|
||||
const refZeroObj = {
|
||||
objects: 0,
|
||||
versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 0, prev: 0 },
|
||||
byLocation: {},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refSingleObj = {
|
||||
objects: 2,
|
||||
versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refSingleObjVer = {
|
||||
objects: 1,
|
||||
versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refMultiObjVer = {
|
||||
objects: 1,
|
||||
versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
locationTwo: { curr: 100, prev: 100 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const refMultiObj = {
|
||||
objects: 2,
|
||||
versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 400, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
locationTwo: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
|
||||
const singleSite = size => ({
|
||||
'content-length': size,
|
||||
'dataStoreName': 'locationOne',
|
||||
'replicationInfo': {
|
||||
backends: [],
|
||||
},
|
||||
});
|
||||
|
||||
const multiSite = (size, isComplete) => ({
|
||||
'content-length': size,
|
||||
'dataStoreName': 'locationOne',
|
||||
'replicationInfo': {
|
||||
backends: [{
|
||||
site: 'locationTwo',
|
||||
status: isComplete ? 'COMPLETED' : 'PENDING',
|
||||
}],
|
||||
},
|
||||
});
|
||||
|
||||
const transientSite = (size, status, backends) => ({
|
||||
'content-length': size,
|
||||
'dataStoreName': 'locationOne',
|
||||
'replicationInfo': { status, backends },
|
||||
});
|
||||
|
||||
const locationConstraints = {
|
||||
locationOne: { isTransient: true },
|
||||
locationTwo: { isTransient: false },
|
||||
};
|
||||
|
||||
const dataCounter = new DataCounter();
|
||||
|
||||
describe('DataCounter Class', () => {
|
||||
it('should create a zero object', () => {
|
||||
dataCounter.set(refZeroObj);
|
||||
assert.deepStrictEqual(dataCounter.results(), refZeroObj);
|
||||
});
|
||||
|
||||
it('should skip dataCounter methods if initial values are not set', () => {
|
||||
const testCounter = new DataCounter();
|
||||
testCounter.addObject(singleSite(100), null, NEW_OBJ);
|
||||
assert.deepStrictEqual(testCounter.results(), refZeroObj);
|
||||
});
|
||||
});
|
||||
|
||||
describe('DateCounter::updateTransientList', () => {
|
||||
afterEach(() => dataCounter.updateTransientList({}));
|
||||
it('should set transient list', () => {
|
||||
assert.deepStrictEqual(dataCounter.transientList, {});
|
||||
dataCounter.updateTransientList(locationConstraints);
|
||||
const expectedRes = { locationOne: true, locationTwo: false };
|
||||
assert.deepStrictEqual(dataCounter.transientList, expectedRes);
|
||||
});
|
||||
});
|
||||
|
||||
describe('DataCounter::addObject', () => {
|
||||
const tests = [
|
||||
{
|
||||
it: 'should correctly update DataCounter, new object one site',
|
||||
init: refZeroObj,
|
||||
input: [singleSite(100), null, NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, new object multi site',
|
||||
init: refZeroObj,
|
||||
input: [multiSite(100, true), null, NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, overwrite single site',
|
||||
init: refSingleObj,
|
||||
input: [singleSite(100), singleSite(50), NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 250, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, overwrite multi site',
|
||||
init: refMultiObj,
|
||||
input: [multiSite(100, true), multiSite(50, true), NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 500, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 0 },
|
||||
locationTwo: { curr: 250, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, new version single site',
|
||||
init: refSingleObj,
|
||||
input: [singleSite(100), singleSite(50), NEW_VER],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 250, prev: 50 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 50 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, new version multi site',
|
||||
init: refMultiObj,
|
||||
input: [multiSite(100, true), multiSite(50, true), NEW_VER],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 500, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 250, prev: 50 },
|
||||
locationTwo: { curr: 250, prev: 50 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly ignore pending status, multi site',
|
||||
init: refZeroObj,
|
||||
input: [multiSite(100, false), null, NEW_OBJ],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'replication completion update in master object',
|
||||
init: refSingleObj,
|
||||
input: [multiSite(100, true), multiSite(100, false), UPDATE_MST],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 300, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'replication completion update in versioned object',
|
||||
init: refSingleObjVer,
|
||||
input: [multiSite(100, true), multiSite(100, false), UPDATE_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
locationTwo: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'restoring versioned object as master',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(100, true), multiSite(100, true), RESTORE],
|
||||
expectedRes: {
|
||||
objects: 2, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 400, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 200, prev: 0 },
|
||||
locationTwo: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
tests.forEach(test => it(test.it, () => {
|
||||
const { expectedRes, input, init } = test;
|
||||
dataCounter.set(init);
|
||||
dataCounter.addObject(...input);
|
||||
const testResults = dataCounter.results();
|
||||
Object.keys(expectedRes).forEach(key => {
|
||||
if (typeof expectedRes[key] === 'object') {
|
||||
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||
} else {
|
||||
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
||||
|
||||
describe('DataCounter, update with transient location', () => {
|
||||
before(() => dataCounter.updateTransientList(locationConstraints));
|
||||
after(() => dataCounter.updateTransientList({}));
|
||||
|
||||
const pCurrMD = transientSite(100, 'PENDING', [
|
||||
{ site: 'site1', status: 'PENDING' },
|
||||
{ site: 'site2', status: 'COMPLETED' },
|
||||
]);
|
||||
const cCurrMD = transientSite(100, 'COMPLETED', [
|
||||
{ site: 'site1', status: 'COMPLETED' },
|
||||
{ site: 'site2', status: 'COMPLETED' },
|
||||
]);
|
||||
const prevMD = transientSite(100, 'PENDING', [
|
||||
{ site: 'site1', status: 'PENDING' },
|
||||
{ site: 'site2', status: 'PENDING' },
|
||||
]);
|
||||
const transientTest = [
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'version object, replication status = PENDING',
|
||||
init: refSingleObjVer,
|
||||
input: [pCurrMD, prevMD, UPDATE_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
site2: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'version object, replication status = COMPLETED',
|
||||
init: refSingleObjVer,
|
||||
input: [cCurrMD, prevMD, UPDATE_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 100, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
site1: { curr: 0, prev: 100 },
|
||||
site2: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'master object, replication status = PENDING',
|
||||
init: refSingleObjVer,
|
||||
input: [pCurrMD, prevMD, UPDATE_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 100 },
|
||||
site2: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'master object, replication status = COMPLETED',
|
||||
init: refSingleObjVer,
|
||||
input: [cCurrMD, prevMD, UPDATE_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 0, prev: 100 },
|
||||
site1: { curr: 100, prev: 0 },
|
||||
site2: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
transientTest.forEach(test => it(test.it, () => {
|
||||
const { expectedRes, input, init } = test;
|
||||
dataCounter.set(init);
|
||||
dataCounter.addObject(...input);
|
||||
const testResults = dataCounter.results();
|
||||
Object.keys(expectedRes).forEach(key => {
|
||||
if (typeof expectedRes[key] === 'object') {
|
||||
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||
} else {
|
||||
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
||||
|
||||
describe('DataCounter::delObject', () => {
|
||||
const tests = [
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete master object single site',
|
||||
init: refMultiObj,
|
||||
input: [singleSite(100), DEL_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 300, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 200, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete master object multi site',
|
||||
init: refMultiObj,
|
||||
input: [multiSite(100, true), DEL_MST],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete versioned object single site',
|
||||
init: refMultiObjVer,
|
||||
input: [singleSite(100), DEL_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 100 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should correctly update DataCounter, ' +
|
||||
'delete versioned object multi site',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(100, true), DEL_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should clamp negative values to 0, master object',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(300, true), DEL_MST],
|
||||
expectedRes: {
|
||||
objects: 0, versions: 1,
|
||||
dataManaged: {
|
||||
total: { curr: 0, prev: 200 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 0, prev: 100 },
|
||||
locationTwo: { curr: 0, prev: 100 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
it: 'should clamp negative values to 0, versioned object',
|
||||
init: refMultiObjVer,
|
||||
input: [multiSite(300, true), DEL_VER],
|
||||
expectedRes: {
|
||||
objects: 1, versions: 0,
|
||||
dataManaged: {
|
||||
total: { curr: 200, prev: 0 },
|
||||
byLocation: {
|
||||
locationOne: { curr: 100, prev: 0 },
|
||||
locationTwo: { curr: 100, prev: 0 },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
|
||||
tests.forEach(test => it(test.it, () => {
|
||||
const { expectedRes, input, init } = test;
|
||||
dataCounter.set(init);
|
||||
dataCounter.delObject(...input);
|
||||
const testResults = dataCounter.results();
|
||||
Object.keys(expectedRes).forEach(key => {
|
||||
if (typeof expectedRes[key] === 'object') {
|
||||
assert.deepStrictEqual(testResults[key], expectedRes[key]);
|
||||
} else {
|
||||
assert.strictEqual(testResults[key], expectedRes[key]);
|
||||
}
|
||||
});
|
||||
}));
|
||||
});
|
|
@ -2,47 +2,9 @@ const assert = require('assert');
|
|||
|
||||
const MongoClientInterface = require(
|
||||
'../../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
|
||||
const DummyMongoDB = require('./utils/DummyMongoDB');
|
||||
const DummyConfigObject = require('./utils/DummyConfigObject');
|
||||
const DummyRequestLogger = require('./utils/DummyRequestLogger');
|
||||
|
||||
const log = new DummyRequestLogger();
|
||||
const mongoTestClient = new MongoClientInterface({});
|
||||
mongoTestClient.db = new DummyMongoDB();
|
||||
|
||||
describe('MongoClientInterface, init behavior', () => {
|
||||
let s3ConfigObj;
|
||||
const locationConstraints = {
|
||||
locationOne: { isTransient: true },
|
||||
locationTwo: { isTransient: false },
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
s3ConfigObj = new DummyConfigObject();
|
||||
});
|
||||
|
||||
it('should set DataCounter transientList when declaring a ' +
|
||||
'new MongoClientInterface object', () => {
|
||||
s3ConfigObj.setLocationConstraints(locationConstraints);
|
||||
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||
const expectedRes = { locationOne: true, locationTwo: false };
|
||||
assert.deepStrictEqual(
|
||||
mongoClient.dataCount.transientList, expectedRes);
|
||||
});
|
||||
|
||||
it('should update DataCounter transientList if location constraints ' +
|
||||
'are updated', done => {
|
||||
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
|
||||
assert.deepStrictEqual(mongoClient.dataCount.transientList, {});
|
||||
const expectedRes = { locationOne: true, locationTwo: false };
|
||||
s3ConfigObj.once('MongoClientTestDone', () => {
|
||||
assert.deepStrictEqual(
|
||||
mongoClient.dataCount.transientList, expectedRes);
|
||||
return done();
|
||||
});
|
||||
s3ConfigObj.setLocationConstraints(locationConstraints);
|
||||
});
|
||||
});
|
||||
|
||||
describe('MongoClientInterface::_handleResults', () => {
|
||||
it('should return zero-result', () => {
|
||||
|
@ -103,140 +65,6 @@ describe('MongoClientInterface::_handleResults', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('MongoClientInterface::_handleMongo', () => {
|
||||
beforeEach(() => mongoTestClient.db.reset());
|
||||
|
||||
it('should return error if mongo aggregate fails', done => {
|
||||
const retValues = [new Error('testError')];
|
||||
mongoTestClient.db.setReturnValues(retValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log, err => {
|
||||
assert(err, 'Expected error, but got success');
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should return empty object if mongo aggregate has no results', done => {
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should return empty object if mongo aggregate has missing results',
|
||||
done => {
|
||||
const retValues = [[{
|
||||
count: undefined,
|
||||
data: undefined,
|
||||
repData: undefined,
|
||||
}]];
|
||||
mongoTestClient.db.setReturnValues(retValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
const testRetValues = [[{
|
||||
count: [{ _id: null, count: 100 }],
|
||||
data: [
|
||||
{ _id: 'locationone', bytes: 1000 },
|
||||
{ _id: 'locationtwo', bytes: 1000 },
|
||||
],
|
||||
repData: [
|
||||
{ _id: 'awsbackend', bytes: 500 },
|
||||
{ _id: 'azurebackend', bytes: 500 },
|
||||
{ _id: 'gcpbackend', bytes: 500 },
|
||||
],
|
||||
compData: [
|
||||
{ _id: 'locationone', bytes: 500 },
|
||||
{ _id: 'locationtwo', bytes: 500 },
|
||||
],
|
||||
}]];
|
||||
|
||||
it('should return correct results, transient false', done => {
|
||||
mongoTestClient.db.setReturnValues(testRetValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, false, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {
|
||||
count: 100,
|
||||
data: {
|
||||
locationone: 1000,
|
||||
locationtwo: 1000,
|
||||
awsbackend: 500,
|
||||
azurebackend: 500,
|
||||
gcpbackend: 500,
|
||||
},
|
||||
});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should return correct results, transient true', done => {
|
||||
mongoTestClient.db.setReturnValues(testRetValues);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, true, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {
|
||||
count: 100,
|
||||
data: {
|
||||
locationone: 500,
|
||||
locationtwo: 500,
|
||||
awsbackend: 500,
|
||||
azurebackend: 500,
|
||||
gcpbackend: 500,
|
||||
},
|
||||
});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
|
||||
const testRetValuesNeg = [[{
|
||||
count: [{ _id: null, count: 100 }],
|
||||
data: [
|
||||
{ _id: 'locationone', bytes: 100 },
|
||||
{ _id: 'locationtwo', bytes: 100 },
|
||||
],
|
||||
repData: [
|
||||
{ _id: 'awsbackend', bytes: 500 },
|
||||
{ _id: 'azurebackend', bytes: 500 },
|
||||
{ _id: 'gcpbackend', bytes: 500 },
|
||||
],
|
||||
compData: [
|
||||
{ _id: 'locationone', bytes: 500 },
|
||||
{ _id: 'locationtwo', bytes: 500 },
|
||||
],
|
||||
}]];
|
||||
it('should return clamp negative values to 0', done => {
|
||||
mongoTestClient.db.setReturnValues(testRetValuesNeg);
|
||||
const testCollection = mongoTestClient.db.collection('test');
|
||||
mongoTestClient._handleMongo(testCollection, {}, true, log,
|
||||
(err, res) => {
|
||||
assert.ifError(err, `Expected success, but got error ${err}`);
|
||||
assert.deepStrictEqual(res, {
|
||||
count: 100,
|
||||
data: {
|
||||
locationone: 0,
|
||||
locationtwo: 0,
|
||||
awsbackend: 500,
|
||||
azurebackend: 500,
|
||||
gcpbackend: 500,
|
||||
},
|
||||
});
|
||||
return done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('MongoClientInterface, misc', () => {
|
||||
let s3ConfigObj;
|
||||
|
||||
|
|
|
@ -1,103 +0,0 @@
|
|||
const testError = new Error('test error');
|
||||
|
||||
class DummyCollection {
|
||||
constructor(name, isFail) {
|
||||
this.s = {
|
||||
name,
|
||||
};
|
||||
this.fail = isFail;
|
||||
this.retQueue = [];
|
||||
}
|
||||
|
||||
setReturnValues(retArray) {
|
||||
this.retQueue.push(...retArray);
|
||||
}
|
||||
|
||||
aggregate() {
|
||||
return {
|
||||
toArray: cb => {
|
||||
if (this.retQueue.length <= 0) {
|
||||
return cb(null, []);
|
||||
}
|
||||
const retVal = this.retQueue[0];
|
||||
this.retQueue = this.retQueue.slice(1);
|
||||
if (retVal instanceof Error) {
|
||||
return cb(retVal);
|
||||
}
|
||||
return cb(null, retVal);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
bulkWrite(cmds, opt, cb) {
|
||||
process.stdout.write('mock mongodb.bulkWrite call\n');
|
||||
if (this.fail) {
|
||||
return cb(testError);
|
||||
}
|
||||
return cb();
|
||||
}
|
||||
|
||||
update(target, doc, opt, cb) {
|
||||
process.stdout.write('mock mongodb.update call\n');
|
||||
if (this.fail) {
|
||||
return cb(testError);
|
||||
}
|
||||
return cb();
|
||||
}
|
||||
|
||||
find() {
|
||||
return {
|
||||
toArray: cb => {
|
||||
if (this.retQueue.length <= 0) {
|
||||
return cb(null, []);
|
||||
}
|
||||
const retVal = this.retQueue[0];
|
||||
this.retQueue = this.retQueue.slice(1);
|
||||
if (retVal instanceof Error) {
|
||||
return cb(retVal);
|
||||
}
|
||||
return cb(null, retVal);
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
findOne(query, opt, cb) {
|
||||
if (typeof opt === 'function' && cb === undefined) {
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
cb = opt;
|
||||
}
|
||||
if (this.retQueue.length <= 0) {
|
||||
return cb(null);
|
||||
}
|
||||
const retVal = this.retQueue[0];
|
||||
this.retQueue = this.retQueue.slice(1);
|
||||
if (retVal instanceof Error) {
|
||||
return cb(retVal);
|
||||
}
|
||||
return cb(null, retVal);
|
||||
}
|
||||
}
|
||||
|
||||
class DummyMongoDB {
|
||||
contructor() {
|
||||
this.fail = false;
|
||||
this.returnQueue = [];
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.fail = false;
|
||||
this.returnQueue = [];
|
||||
}
|
||||
|
||||
setReturnValues(retValues) {
|
||||
this.returnQueue.push(...retValues);
|
||||
}
|
||||
|
||||
collection(name) {
|
||||
const c = new DummyCollection(name, this.fail);
|
||||
c.setReturnValues(this.returnQueue);
|
||||
return c;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = DummyMongoDB;
|
Loading…
Reference in New Issue