Compare commits

..

6 Commits

Author SHA1 Message Date
Alexander Chan 2f8af6352c tmp 2020-01-03 21:51:38 +00:00
Alexander Chan eb79743c44 tmp 2020-01-03 19:23:59 +00:00
Alexander Chan a9bedee48b tmp 2020-01-03 19:07:47 +00:00
Alexander Chan 239060765c [squash] changes 2019-12-30 22:42:41 +00:00
Alexander Chan f7d0406d10 [squash] changes 2019-12-28 00:50:42 +00:00
Alexander Chan c3389da37e wip: countItems 2019-12-28 00:21:58 +00:00
5 changed files with 519 additions and 1044 deletions

View File

@ -11,7 +11,6 @@
*/ */
const async = require('async'); const async = require('async');
const { EventEmitter } = require('events');
const constants = require('../../../constants'); const constants = require('../../../constants');
const { reshapeExceptionError } = require('../../../errorUtils'); const { reshapeExceptionError } = require('../../../errorUtils');
@ -27,7 +26,6 @@ const listAlgos = require('../../../algos/list/exportAlgos');
const MongoReadStream = require('./readStream'); const MongoReadStream = require('./readStream');
const MongoUtils = require('./utils'); const MongoUtils = require('./utils');
const { DataCounter } = require('./DataCounter');
const Skip = require('../../../algos/list/skip'); const Skip = require('../../../algos/list/skip');
const USERSBUCKET = '__usersbucket'; const USERSBUCKET = '__usersbucket';
@ -40,6 +38,7 @@ const ASYNC_REPAIR_TIMEOUT = 15000;
const CONNECT_TIMEOUT_MS = 5000; const CONNECT_TIMEOUT_MS = 5000;
// MongoDB default // MongoDB default
const SOCKET_TIMEOUT_MS = 360000; const SOCKET_TIMEOUT_MS = 360000;
const CONCURRENT_CURSORS = 10;
const initialInstanceID = process.env.INITIAL_INSTANCE_ID; const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
@ -92,7 +91,7 @@ function generatePHDVersion(versionId) {
class MongoClientInterface { class MongoClientInterface {
constructor(params) { constructor(params) {
const { replicaSetHosts, writeConcern, replicaSet, readPreference, path, const { replicaSetHosts, writeConcern, replicaSet, readPreference, path,
database, logger, replicationGroupId, authCredentials, config, database, logger, replicationGroupId, authCredentials,
isLocationTransient } = params; isLocationTransient } = params;
const cred = MongoUtils.credPrefix(authCredentials); const cred = MongoUtils.credPrefix(authCredentials);
this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` + this.mongoUrl = `mongodb://${cred}${replicaSetHosts}/` +
@ -104,20 +103,7 @@ class MongoClientInterface {
this.path = path; this.path = path;
this.replicationGroupId = replicationGroupId; this.replicationGroupId = replicationGroupId;
this.database = database; this.database = database;
this.dataCount = new DataCounter();
this.isLocationTransient = isLocationTransient; this.isLocationTransient = isLocationTransient;
if (config && config instanceof EventEmitter) {
this.config = config;
this.config.on('location-constraints-update', () => {
this.dataCount
.updateTransientList(this.config.locationConstraints);
if (this.config.isTest) {
this.config.emit('MongoClientTestDone');
}
});
this.dataCount.updateTransientList(this.config.locationConstraints);
}
} }
setup(cb) { setup(cb) {
@ -136,6 +122,10 @@ class MongoClientInterface {
const socketTimeoutMS = Number.parseInt( const socketTimeoutMS = Number.parseInt(
process.env.MONGO_SOCKET_TIMEOUT_MS, 10) || SOCKET_TIMEOUT_MS; process.env.MONGO_SOCKET_TIMEOUT_MS, 10) || SOCKET_TIMEOUT_MS;
const options = { connectTimeoutMS, socketTimeoutMS }; const options = { connectTimeoutMS, socketTimeoutMS };
if (process.env.MONGO_POOL_SIZE &&
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
}
return MongoClient.connect(this.mongoUrl, options, (err, client) => { return MongoClient.connect(this.mongoUrl, options, (err, client) => {
if (err) { if (err) {
this.logger.error('error connecting to mongodb', this.logger.error('error connecting to mongodb',
@ -1149,7 +1139,7 @@ class MongoClientInterface {
* get bucket related information for count items, used by cloudserver * get bucket related information for count items, used by cloudserver
* and s3utils * and s3utils
*/ */
_getBucketInfos(log, cb) { getBucketInfos(log, cb) {
let bucketCount = 0; let bucketCount = 0;
const bucketInfos = []; const bucketInfos = [];
@ -1198,7 +1188,7 @@ class MongoClientInterface {
} }
countItems(log, cb) { countItems(log, cb) {
this._getBucketInfos(log, (err, res) => { this.getBucketInfos(log, (err, res) => {
if (err) { if (err) {
log.error('error getting bucket info', { log.error('error getting bucket info', {
method: 'countItems', method: 'countItems',
@ -1230,6 +1220,25 @@ class MongoClientInterface {
}); });
} }
consolidateData(store, dataManaged) {
if (dataManaged && dataManaged.locations && dataManaged.total) {
const locations = dataManaged.locations;
store.dataManaged.total.curr += dataManaged.total.curr;
store.dataManaged.total.prev += dataManaged.total.prev;
Object.keys(locations).forEach(site => {
if (!store.dataManaged.byLocation[site]) {
store.dataManaged.byLocation[site] =
Object.assign({}, locations[site]);
} else {
store.dataManaged.byLocation[site].curr +=
locations[site].curr;
store.dataManaged.byLocation[site].prev +=
locations[site].prev;
}
});
}
}
scanItemCount(log, cb) { scanItemCount(log, cb) {
const store = { const store = {
objects: 0, objects: 0,
@ -1243,26 +1252,10 @@ class MongoClientInterface {
stalled: 0, stalled: 0,
}; };
const consolidateData = dataManaged => { const consolidateData = (dataManaged) =>
if (dataManaged && dataManaged.locations && dataManaged.total) { this.consolidateData(store, dataManaged);
const locations = dataManaged.locations;
store.dataManaged.total.curr += dataManaged.total.curr;
store.dataManaged.total.prev += dataManaged.total.prev;
Object.keys(locations).forEach(site => {
if (!store.dataManaged.byLocation[site]) {
store.dataManaged.byLocation[site] =
Object.assign({}, locations[site]);
} else {
store.dataManaged.byLocation[site].curr +=
locations[site].curr;
store.dataManaged.byLocation[site].prev +=
locations[site].prev;
}
});
}
};
this._getBucketInfos(log, (err, res) => { this.getBucketInfos(log, (err, res) => {
if (err) { if (err) {
log.error('error getting bucket info', { log.error('error getting bucket info', {
method: 'scanItemCount', method: 'scanItemCount',
@ -1283,42 +1276,50 @@ class MongoClientInterface {
store.buckets = bucketCount; store.buckets = bucketCount;
store.bucketList = retBucketInfos; store.bucketList = retBucketInfos;
return async.eachLimit(bucketInfos, 10, (bucketInfo, done) => { let concurrentCursors = CONCURRENT_CURSORS;
async.waterfall([ if (process.env.CONCURRENT_CURSORS &&
next => this._getIsTransient(bucketInfo, log, next), !Number.isNaN(process.env.CONCURRENT_CURSORS)) {
(isTransient, next) => { concurrentCursors = Number
const bucketName = bucketInfo.getName(); .parseInt(process.env.CONCURRENT_CURSORS, 10);
this.getObjectMDStats(bucketName, bucketInfo, }
isTransient, log, next);
}, return async.eachLimit(bucketInfos, concurrentCursors,
], (err, results) => { (bucketInfo, done) => {
async.waterfall([
next => this._getIsTransient(bucketInfo, log, next),
(isTransient, next) => {
const bucketName = bucketInfo.getName();
this.getObjectMDStats(bucketName, bucketInfo,
isTransient, log, next);
},
], (err, results) => {
if (err) {
return done(err);
}
if (results.dataManaged) {
store.objects += results.objects;
store.versions += results.versions;
store.stalled += results.stalled;
consolidateData(results.dataManaged);
}
return done();
});
}, err => {
if (err) { if (err) {
return done(err);
}
if (results.dataManaged) {
store.objects += results.objects;
store.versions += results.versions;
store.stalled += results.stalled;
consolidateData(results.dataManaged);
}
return done();
});
}, err => {
if (err) {
return cb(err);
}
// save to infostore
return this.updateCountItems(store, log, err => {
if (err) {
log.error('error saving count items in mongo', {
method: 'scanItemCount',
error: err,
});
return cb(err); return cb(err);
} }
return cb(null, store); // save to infostore
return this.updateCountItems(store, log, err => {
if (err) {
log.error('error saving count items in mongo', {
method: 'scanItemCount',
error: err,
});
return cb(err);
}
return cb(null, store);
});
}); });
});
}); });
return undefined; return undefined;
} }
@ -1330,7 +1331,6 @@ class MongoClientInterface {
this.isLocationTransient(locConstraint, log, cb); this.isLocationTransient(locConstraint, log, cb);
return; return;
} }
this._pensieveLocationIsTransient(locConstraint, log, cb); this._pensieveLocationIsTransient(locConstraint, log, cb);
} }
@ -1415,211 +1415,136 @@ class MongoClientInterface {
}; };
} }
_handleCount(entry) { /**
return entry && entry.count > 0 ? entry.count : 0; * @param{object} entry -
} * @param{string} entry._id -
* @param{object} entry.value -
_handleEntries(entries) { * @param{boolean} isTransient -
* @returns{object.<string, number>} results -
*/
_processEntryData(entry, isTransient) {
const results = {}; const results = {};
if (entries) { if (!isTransient ||
entries.forEach(entry => { entry.value.replicationInfo.status !== 'COMPLETED') {
results[entry._id] = entry.bytes; if (results[entry.value.dataStoreName]) {
}); results[entry.value.dataStoreName] +=
entry.value['content-length'];
} else {
results[entry.value.dataStoreName] =
entry.value['content-length'];
}
} else {
if (!results[entry.value.dataStoreName]) {
results[entry.value.dataStoreName] = 0;
}
} }
entry.value.replicationInfo.backends.forEach(rep => {
if (rep.status === 'COMPLETED') {
if (results[rep.site]) {
results[rep.site] += entry.value['content-length'];
} else {
results[rep.site] = entry.value['content-length'];
}
}
});
return results; return results;
} }
_handleMongo(c, filter, isTransient, log, cb) { /**
const reducedFields = { * @param{object} entry -
'_id': 1, * @param{string} entry._id -
'value.versionId': 1, * @param{object} entry.value -
'value.replicationInfo.status': 1, * @param{Date} cmpDate -
'value.replicationInfo.backends': 1, * @returns{boolean} stalled -
'value.content-length': 1, */
'value.dataStoreName': 1, _processEntryStalled(entry, cmpDate) {
}; if (entry.value.replicationInfo.status === 'PENDING') {
return false;
const aggCount = [ }
{ $project: { _id: 1 } }, const lastModified = Date.parse(entry.value['last-modified'] || null);
{ $group: { _id: null, count: { $sum: 1 } } }, if (isNaN(lastModified) || new Date(lastModified) > cmpDate) {
]; return false;
}
const aggData = [ return true;
{ $project: {
'value.dataStoreName': 1,
'value.content-length': 1,
} },
{ $group: {
_id: '$value.dataStoreName',
bytes: { $sum: '$value.content-length' },
} },
];
const aggRepData = [
{ $project: {
'value.replicationInfo.backends': 1,
'value.content-length': 1,
} },
{ $unwind: '$value.replicationInfo.backends' },
{ $match: {
'value.replicationInfo.backends.status': { $eq: 'COMPLETED' },
} },
{ $group: {
_id: '$value.replicationInfo.backends.site',
bytes: { $sum: '$value.content-length' },
} },
];
const aggCompleted = [
{ $project: {
'value.dataStoreName': 1,
'value.content-length': 1,
'inComplete': {
$eq: ['$value.replicationInfo.status', 'COMPLETED'],
},
} },
{ $match: { inComplete: true } },
{ $group: {
_id: '$value.dataStoreName',
bytes: { $sum: '$value.content-length' },
} },
];
return c.aggregate([
{ $project: reducedFields },
{ $match: filter },
{ $facet: {
count: aggCount,
data: aggData,
repData: aggRepData,
compData: aggCompleted,
} },
]).toArray((err, res) => {
if (err) {
log.error('Error when processing mongo entries', {
method: '_handleMongo',
error: err,
});
return cb(err);
}
if (!res || res.length < 1) {
log.debug('aggregate returned empty results', {
method: '_handleMongo',
});
return cb(null, {});
}
const agg = res[0];
if (!agg || !agg.count || !agg.data || !agg.repData) {
log.debug('missing field in aggregate results', {
method: '_handleMongo',
});
return cb(null, {});
}
const retResult = {
count: this._handleCount(agg.count[0] || {}),
data: this._handleEntries(agg.data),
};
const repDataEntries = this._handleEntries(agg.repData);
Object.keys(repDataEntries).forEach(site => {
if (!retResult.data[site]) {
retResult.data[site] = 0;
}
retResult.data[site] += repDataEntries[site];
});
if (isTransient && agg.compData) {
const compDataEntries = this._handleEntries(agg.compData);
Object.keys(compDataEntries).forEach(site => {
if (retResult.data[site]) {
retResult.data[site] -= compDataEntries[site];
retResult.data[site] =
Math.max(0, retResult.data[site]);
}
});
}
return cb(null, retResult);
});
}
_getStalled(c, cmpDate, log, cb) {
const reducedFields = {
'_id': {
id: '$_id',
status: '$value.replicationInfo.status',
},
'value.last-modified': 1,
};
return c.aggregate([
{ $project: reducedFields },
{ $match: {
'_id.id': { $regex: /\0/ },
'_id.status': { $eq: 'PENDING' },
} },
]).toArray((err, res) => {
if (err) {
log.debug('Unable to retrieve stalled entries', {
error: err,
});
return cb(null);
}
const count = res.filter(data => {
if (!data || typeof data !== 'object' ||
!data.value || typeof data.value !== 'object') {
return false;
}
const time = data.value['last-modified'] || null;
if (isNaN(Date.parse(time))) {
return false;
}
const testDate = new Date(time);
return testDate <= cmpDate;
}).length;
return cb(null, count);
});
} }
/*
* scan and process a single collection (bucket)
*/
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) { getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
const c = this.getCollection(bucketName); const c = this.getCollection(bucketName);
const mstFilter = { const cursor = c.find({}, {
'_id': { $regex: /^[^\0]+$/ }, sort: { _id: 1 },
'value.versionId': { $exists: true }, projection: {
'_id': 1,
'value.last-modified': 1,
'value.replicationInfo': 1,
'value.dataStoreName': 1,
'value.content-length': 1,
'value.versionId': 1,
},
});
const collRes = {
masterCount: 0,
masterData: {},
nullCount: 0,
nullData: {},
versionCount: 0,
versionData: {},
}; };
const verFilter = { _id: { $regex: /\0/ } }; let stalledCount = 0;
const nullFilter = {
'_id': { $regex: /^[^\0]+$/ },
'value.versionId': { $exists: false },
};
const cmpDate = new Date(); const cmpDate = new Date();
cmpDate.setHours(cmpDate.getHours() - 1); cmpDate.setHours(cmpDate.getHours() - 1);
async.parallel({ cursor.forEach(
version: done => res => {
this._handleMongo(c, verFilter, isTransient, log, done), const data = this._processEntryData(res, isTransient);
null: done => let targetCount;
this._handleMongo(c, nullFilter, isTransient, log, done), let targetData;
master: done => if (res._id.indexOf('\0') !== -1) {
this._handleMongo(c, mstFilter, isTransient, log, done), // versioned item
stalled: done => targetCount = 'versionCount';
this._getStalled(c, cmpDate, log, done), targetData = 'versionData';
}, (err, res) => {
if (err) { if (res.value.replicationInfo.backends.length > 0 &&
return callback(err); this._processEntryStalled(res, cmpDate)) {
stalledCount++;
}
} else if (!!res.value.versionId) {
// master version
targetCount = 'masterCount';
targetData = 'masterData';
} else {
// null version
targetCount = 'nullCount';
targetData = 'nullData';
}
collRes[targetCount]++;
Object.keys(data).forEach(site => {
if (collRes[targetData][site]) {
collRes[targetData][site] += data[site];
} else {
collRes[targetData][site] = data[site];
}
});
},
err => {
if (err) {
log.error('Error when processing mongo entries', {
method: 'getObjectMDStats',
error: err,
});
return callback(err);
}
const bucketStatus = bucketInfo.getVersioningConfiguration();
const isVer = (bucketStatus &&
(bucketStatus.Status === 'Enabled' ||
bucketStatus.Status === 'Suspended'));
const retResult = this._handleResults(collRes, isVer);
retResult.stalled = stalledCount;
return callback(null, retResult);
} }
const resObj = { );
masterCount: res.master.count || 0,
masterData: res.master.data || {},
nullCount: res.null.count || 0,
nullData: res.null.data || {},
versionCount: res.version.count || 0,
versionData: res.version.data || {},
};
const bucketStatus = bucketInfo.getVersioningConfiguration();
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
bucketStatus.Status === 'Suspended'));
const retResult = this._handleResults(resObj, isVer);
retResult.stalled = res.stalled || 0;
return callback(null, retResult);
});
} }
getIngestionBuckets(log, cb) { getIngestionBuckets(log, cb) {
@ -1748,3 +1673,4 @@ class MongoClientInterface {
} }
module.exports = MongoClientInterface; module.exports = MongoClientInterface;

View File

@ -1,11 +1,14 @@
const assert = require('assert');
const async = require('async'); const async = require('async');
const assert = require('assert');
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const { MongoMemoryReplSet } = require('mongodb-memory-server'); const { MongoMemoryReplSet } = require('mongodb-memory-server');
const errors = require('../../../../lib/errors'); const errors = require('../../../../lib/errors');
const logger = new werelogs.Logger('MongoClientInterface', 'debug', 'debug'); const logger = new werelogs.Logger('MongoClientInterface', 'debug', 'debug');
const BucketInfo = require('../../../../lib/models/BucketInfo'); const BucketInfo = require('../../../../lib/models/BucketInfo');
const ObjectMD = require('../../../../lib/models/ObjectMD');
const MongoClientInterface =
require('../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
const MetadataWrapper = const MetadataWrapper =
require('../../../../lib/storage/metadata/MetadataWrapper'); require('../../../../lib/storage/metadata/MetadataWrapper');
@ -314,3 +317,331 @@ describe('MongoClientInterface', () => {
})); }));
}); });
}); });
function isTransientFn(locationName, log, callback) {
if (locationName.startsWith('transient')) {
return callback(null, true);
}
return callback(null, false);
}
function createBucketMD(locationName, bucketName, isVersioned, creationDate) {
const bucketObj = {
'_acl': {
'Canned': 'private',
'FULL_CONTROL': [],
'WRITE': [],
'WRITE_ACP': [],
'READ': [],
'READ_ACP': []
},
'_name': bucketName,
'_owner': 'testOwner',
'_ownerDisplayName':'testOwnerDisplayName',
'_creationDate': creationDate,
'_mdBucketModelVersion': 10,
'_transient': false,
'_deleted': false,
'_serverSideEncryption': null,
'_versioningConfiguration': isVersioned === null
? null
: { Status: isVersioned ? 'Enabled' : 'Suspended' },
'_locationConstraint': locationName,
'_readLocationConstraint': null,
'_cors': null,
'_replicationConfiguration': null,
'_lifecycleConfiguration': null,
'_uid': '',
'_isNFS': null,
'ingestion': null
};
return BucketInfo.fromObj(bucketObj);
}
function createObjectMD(params) {
const {
locationName,
objectKey,
replicationInfo,
lastModified,
size,
versionId,
} = params;
return new ObjectMD()
.setLastModified(lastModified)
.setContentLength(size)
.setDataStoreName(locationName)
.setReplicationInfo(replicationInfo)
.setVersionId(versionId)
.setKey(objectKey);
}
function createReplicationInfo(backends, status) {
return {
status,
backends,
content: [],
destination: '',
storageClass: '',
role: '',
storageType: '',
dataStoreVersionId: '',
isNFS: null,
};
}
describe.only('MongoClientInterface::scanItemCount', () => {
let mongoc;
before(done => {
mongoserver.waitUntilRunning().then(() => {
const opts = {
isLocationTransient: isTransientFn,
replicaSetHosts: 'localhost:27018',
writeConcern: 'majority',
replicaSet: 'rs0',
readPreference: 'primary',
database: dbName,
logger
};
mongoc = new MongoClientInterface(opts);
mongoc.setup(done);
});
});
after(done => {
async.series([
next => mongoc.close(next),
next => mongoserver.stop()
.then(() => next())
.catch(next),
], done);
});
it('should return correct count with transient location (COMPLETED)', done => {
const bucketName = 'test-bucket';
const location = 'transient-loc-1';
const createDate = new Date().toJSON();
const objectName = 'test-object';
const versionId = '00001';
const params = {
locationName: location,
objectKey: objectName,
replicationInfo: createReplicationInfo([
{site: 'loc1', status: 'COMPLETED'},
{site: 'loc2', status: 'COMPLETED'},
], 'COMPLETED'),
size: 1,
versionId,
lastModified: new Date().toJSON(),
};
const expected = {
total: { curr: 2, prev: 0 },
byLocation: {
'transient-loc-1': { curr: 0, prev: 0 },
'loc1': { curr: 1, prev: 0 },
'loc2': { curr: 1, prev: 0 },
},
};
async.series([
next => mongoc.createBucket(
bucketName,
createBucketMD(location, bucketName, false, createDate),
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: false },
logger,
next),
next => scanAndTest(mongoc, expected, next),
], done);
});
it('should return correct count with transient location (PENDING)', done => {
const bucketName = 'test-bucket';
const location = 'transient-loc-1';
const createDate = new Date().toJSON();
const objectName = 'test-object';
const versionId = '00001';
const params = {
locationName: location,
objectKey: objectName,
replicationInfo: createReplicationInfo([
{site: 'loc1', status: 'COMPLETED'},
{site: 'loc2', status: 'COMPLETED'},
], 'PENDING'),
size: 1,
versionId,
lastModified: new Date().toJSON(),
};
const expected = {
total: { curr: 3, prev: 0 },
byLocation: {
'transient-loc-1': { curr: 1, prev: 0 },
'loc1': { curr: 1, prev: 0 },
'loc2': { curr: 1, prev: 0 },
},
};
async.series([
next => mongoc.createBucket(
bucketName,
createBucketMD(location, bucketName, false, createDate),
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: false },
logger,
next),
next => scanAndTest(mongoc, expected, next),
], done);
});
it('should correctly count null objects', done => {
const bucketName = 'test-bucket';
const location = 'test-loc-1';
const createDate = new Date().toJSON();
const objectName = 'test-object';
const versionId = '00001';
const params = {
locationName: location,
objectKey: objectName,
replicationInfo: createReplicationInfo([], ''),
size: 1,
versionId,
lastModified: new Date().toJSON(),
};
const expected = {
total: { curr: 1, prev: 0 },
byLocation: {
'test-loc-1': { curr: 1, prev: 0 },
},
};
async.series([
next => mongoc.createBucket(
bucketName,
createBucketMD(location, bucketName, false, createDate),
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: false },
logger,
next),
next => mongoc.createBucket(
bucketName,
createBucketMD(location, bucketName, true, createDate),
logger,
next),
next => scanAndTest(mongoc, expected, next),
], done);
});
it('should correctly count versioned objects', done => {
const bucketName = 'test-bucket';
const location = 'test-loc-1';
const createDate = new Date().toJSON();
const objectName = 'test-object';
const versionId = '00001';
const params = {
locationName: location,
objectKey: objectName,
replicationInfo: createReplicationInfo([], ''),
size: 1,
versionId,
lastModified: new Date().toJSON(),
};
const expected = {
total: { curr: 1, prev: 1 },
byLocation: {
'test-loc-1': { curr: 1, prev: 1 },
},
};
async.series([
next => mongoc.createBucket(
bucketName,
createBucketMD(location, bucketName, true, createDate),
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: true, versionId: '00001'},
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: true, versionId: '00002' },
logger,
next),
next => scanAndTest(mongoc, expected, next),
], done);
});
it('should correctly count stalled objects', done => {
const bucketName = 'test-bucket';
const location = 'test-loc-1';
const createDate = new Date().toJSON();
const objectName = 'test-object';
const versionId = '00001';
const params = {
locationName: location,
objectKey: objectName,
replicationInfo: createReplicationInfo([], ''),
size: 1,
versionId,
lastModified: new Date().toJSON(),
};
const expected = {
total: { curr: 1, prev: 1 },
byLocation: {
'test-loc-1': { curr: 1, prev: 1 },
},
};
async.series([
next => mongoc.createBucket(
bucketName,
createBucketMD(location, bucketName, true, createDate),
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: true, versionId: '00001'},
logger,
next),
next => mongoc.putObject(
bucketName,
objectName,
createObjectMD(params).getValue(),
{ versioning: true, versionId: '00002' },
logger,
next),
next => scanAndTest(mongoc, expected, next),
], done);
});
});
function scanAndTest(mongoc, expected, callback) {
async.series([
next => mongoc.scanItemCount(logger, next),
next => mongoc.countItems(logger, (err, res) => {
if (err) {
return next(err);
}
console.log(res)
assert.deepStrictEqual(res.dataManaged, expected);
return next();
}),
], callback);
}

View File

@ -1,507 +0,0 @@
const assert = require('assert');
const {
NEW_OBJ,
NEW_VER,
UPDATE_VER,
UPDATE_MST,
RESTORE,
DEL_VER,
DEL_MST,
DataCounter,
} = require('../../../../../lib/storage/metadata/mongoclient/DataCounter');
const refZeroObj = {
objects: 0,
versions: 0,
dataManaged: {
total: { curr: 0, prev: 0 },
byLocation: {},
},
stalled: 0,
};
const refSingleObj = {
objects: 2,
versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
},
},
stalled: 0,
};
const refSingleObjVer = {
objects: 1,
versions: 1,
dataManaged: {
total: { curr: 100, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
},
},
stalled: 0,
};
const refMultiObjVer = {
objects: 1,
versions: 1,
dataManaged: {
total: { curr: 200, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
locationTwo: { curr: 100, prev: 100 },
},
},
stalled: 0,
};
const refMultiObj = {
objects: 2,
versions: 0,
dataManaged: {
total: { curr: 400, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
stalled: 0,
};
const singleSite = size => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': {
backends: [],
},
});
const multiSite = (size, isComplete) => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': {
backends: [{
site: 'locationTwo',
status: isComplete ? 'COMPLETED' : 'PENDING',
}],
},
});
const transientSite = (size, status, backends) => ({
'content-length': size,
'dataStoreName': 'locationOne',
'replicationInfo': { status, backends },
});
const locationConstraints = {
locationOne: { isTransient: true },
locationTwo: { isTransient: false },
};
const dataCounter = new DataCounter();
describe('DataCounter Class', () => {
it('should create a zero object', () => {
dataCounter.set(refZeroObj);
assert.deepStrictEqual(dataCounter.results(), refZeroObj);
});
it('should skip dataCounter methods if initial values are not set', () => {
const testCounter = new DataCounter();
testCounter.addObject(singleSite(100), null, NEW_OBJ);
assert.deepStrictEqual(testCounter.results(), refZeroObj);
});
});
describe('DateCounter::updateTransientList', () => {
afterEach(() => dataCounter.updateTransientList({}));
it('should set transient list', () => {
assert.deepStrictEqual(dataCounter.transientList, {});
dataCounter.updateTransientList(locationConstraints);
const expectedRes = { locationOne: true, locationTwo: false };
assert.deepStrictEqual(dataCounter.transientList, expectedRes);
});
});
describe('DataCounter::addObject', () => {
const tests = [
{
it: 'should correctly update DataCounter, new object one site',
init: refZeroObj,
input: [singleSite(100), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 100, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, new object multi site',
init: refZeroObj,
input: [multiSite(100, true), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, overwrite single site',
init: refSingleObj,
input: [singleSite(100), singleSite(50), NEW_OBJ],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 250, prev: 0 },
byLocation: {
locationOne: { curr: 250, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, overwrite multi site',
init: refMultiObj,
input: [multiSite(100, true), multiSite(50, true), NEW_OBJ],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 500, prev: 0 },
byLocation: {
locationOne: { curr: 250, prev: 0 },
locationTwo: { curr: 250, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, new version single site',
init: refSingleObj,
input: [singleSite(100), singleSite(50), NEW_VER],
expectedRes: {
objects: 2, versions: 1,
dataManaged: {
total: { curr: 250, prev: 50 },
byLocation: {
locationOne: { curr: 250, prev: 50 },
},
},
},
},
{
it: 'should correctly update DataCounter, new version multi site',
init: refMultiObj,
input: [multiSite(100, true), multiSite(50, true), NEW_VER],
expectedRes: {
objects: 2, versions: 1,
dataManaged: {
total: { curr: 500, prev: 100 },
byLocation: {
locationOne: { curr: 250, prev: 50 },
locationTwo: { curr: 250, prev: 50 },
},
},
},
},
{
it: 'should correctly ignore pending status, multi site',
init: refZeroObj,
input: [multiSite(100, false), null, NEW_OBJ],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 100, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'replication completion update in master object',
init: refSingleObj,
input: [multiSite(100, true), multiSite(100, false), UPDATE_MST],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 300, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'replication completion update in versioned object',
init: refSingleObjVer,
input: [multiSite(100, true), multiSite(100, false), UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
locationTwo: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'restoring versioned object as master',
init: refMultiObjVer,
input: [multiSite(100, true), multiSite(100, true), RESTORE],
expectedRes: {
objects: 2, versions: 0,
dataManaged: {
total: { curr: 400, prev: 0 },
byLocation: {
locationOne: { curr: 200, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
},
},
];
tests.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.addObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});
describe('DataCounter, update with transient location', () => {
before(() => dataCounter.updateTransientList(locationConstraints));
after(() => dataCounter.updateTransientList({}));
const pCurrMD = transientSite(100, 'PENDING', [
{ site: 'site1', status: 'PENDING' },
{ site: 'site2', status: 'COMPLETED' },
]);
const cCurrMD = transientSite(100, 'COMPLETED', [
{ site: 'site1', status: 'COMPLETED' },
{ site: 'site2', status: 'COMPLETED' },
]);
const prevMD = transientSite(100, 'PENDING', [
{ site: 'site1', status: 'PENDING' },
{ site: 'site2', status: 'PENDING' },
]);
const transientTest = [
{
it: 'should correctly update DataCounter, ' +
'version object, replication status = PENDING',
init: refSingleObjVer,
input: [pCurrMD, prevMD, UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
site2: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'version object, replication status = COMPLETED',
init: refSingleObjVer,
input: [cCurrMD, prevMD, UPDATE_VER],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 100, prev: 200 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
site1: { curr: 0, prev: 100 },
site2: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'master object, replication status = PENDING',
init: refSingleObjVer,
input: [pCurrMD, prevMD, UPDATE_MST],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 100 },
site2: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'master object, replication status = COMPLETED',
init: refSingleObjVer,
input: [cCurrMD, prevMD, UPDATE_MST],
expectedRes: {
objects: 1, versions: 1,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 0, prev: 100 },
site1: { curr: 100, prev: 0 },
site2: { curr: 100, prev: 0 },
},
},
},
},
];
transientTest.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.addObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});
describe('DataCounter::delObject', () => {
const tests = [
{
it: 'should correctly update DataCounter, ' +
'delete master object single site',
init: refMultiObj,
input: [singleSite(100), DEL_MST],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 300, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 200, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete master object multi site',
init: refMultiObj,
input: [multiSite(100, true), DEL_MST],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete versioned object single site',
init: refMultiObjVer,
input: [singleSite(100), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 100 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 100 },
},
},
},
},
{
it: 'should correctly update DataCounter, ' +
'delete versioned object multi site',
init: refMultiObjVer,
input: [multiSite(100, true), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
{
it: 'should clamp negative values to 0, master object',
init: refMultiObjVer,
input: [multiSite(300, true), DEL_MST],
expectedRes: {
objects: 0, versions: 1,
dataManaged: {
total: { curr: 0, prev: 200 },
byLocation: {
locationOne: { curr: 0, prev: 100 },
locationTwo: { curr: 0, prev: 100 },
},
},
},
},
{
it: 'should clamp negative values to 0, versioned object',
init: refMultiObjVer,
input: [multiSite(300, true), DEL_VER],
expectedRes: {
objects: 1, versions: 0,
dataManaged: {
total: { curr: 200, prev: 0 },
byLocation: {
locationOne: { curr: 100, prev: 0 },
locationTwo: { curr: 100, prev: 0 },
},
},
},
},
];
tests.forEach(test => it(test.it, () => {
const { expectedRes, input, init } = test;
dataCounter.set(init);
dataCounter.delObject(...input);
const testResults = dataCounter.results();
Object.keys(expectedRes).forEach(key => {
if (typeof expectedRes[key] === 'object') {
assert.deepStrictEqual(testResults[key], expectedRes[key]);
} else {
assert.strictEqual(testResults[key], expectedRes[key]);
}
});
}));
});

View File

@ -2,47 +2,9 @@ const assert = require('assert');
const MongoClientInterface = require( const MongoClientInterface = require(
'../../../../../lib/storage/metadata/mongoclient/MongoClientInterface'); '../../../../../lib/storage/metadata/mongoclient/MongoClientInterface');
const DummyMongoDB = require('./utils/DummyMongoDB');
const DummyConfigObject = require('./utils/DummyConfigObject'); const DummyConfigObject = require('./utils/DummyConfigObject');
const DummyRequestLogger = require('./utils/DummyRequestLogger');
const log = new DummyRequestLogger();
const mongoTestClient = new MongoClientInterface({}); const mongoTestClient = new MongoClientInterface({});
mongoTestClient.db = new DummyMongoDB();
describe('MongoClientInterface, init behavior', () => {
let s3ConfigObj;
const locationConstraints = {
locationOne: { isTransient: true },
locationTwo: { isTransient: false },
};
beforeEach(() => {
s3ConfigObj = new DummyConfigObject();
});
it('should set DataCounter transientList when declaring a ' +
'new MongoClientInterface object', () => {
s3ConfigObj.setLocationConstraints(locationConstraints);
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
const expectedRes = { locationOne: true, locationTwo: false };
assert.deepStrictEqual(
mongoClient.dataCount.transientList, expectedRes);
});
it('should update DataCounter transientList if location constraints ' +
'are updated', done => {
const mongoClient = new MongoClientInterface({ config: s3ConfigObj });
assert.deepStrictEqual(mongoClient.dataCount.transientList, {});
const expectedRes = { locationOne: true, locationTwo: false };
s3ConfigObj.once('MongoClientTestDone', () => {
assert.deepStrictEqual(
mongoClient.dataCount.transientList, expectedRes);
return done();
});
s3ConfigObj.setLocationConstraints(locationConstraints);
});
});
describe('MongoClientInterface::_handleResults', () => { describe('MongoClientInterface::_handleResults', () => {
it('should return zero-result', () => { it('should return zero-result', () => {
@ -103,140 +65,6 @@ describe('MongoClientInterface::_handleResults', () => {
}); });
}); });
describe('MongoClientInterface::_handleMongo', () => {
beforeEach(() => mongoTestClient.db.reset());
it('should return error if mongo aggregate fails', done => {
const retValues = [new Error('testError')];
mongoTestClient.db.setReturnValues(retValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log, err => {
assert(err, 'Expected error, but got success');
return done();
});
});
it('should return empty object if mongo aggregate has no results', done => {
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {});
return done();
});
});
it('should return empty object if mongo aggregate has missing results',
done => {
const retValues = [[{
count: undefined,
data: undefined,
repData: undefined,
}]];
mongoTestClient.db.setReturnValues(retValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {});
return done();
});
});
const testRetValues = [[{
count: [{ _id: null, count: 100 }],
data: [
{ _id: 'locationone', bytes: 1000 },
{ _id: 'locationtwo', bytes: 1000 },
],
repData: [
{ _id: 'awsbackend', bytes: 500 },
{ _id: 'azurebackend', bytes: 500 },
{ _id: 'gcpbackend', bytes: 500 },
],
compData: [
{ _id: 'locationone', bytes: 500 },
{ _id: 'locationtwo', bytes: 500 },
],
}]];
it('should return correct results, transient false', done => {
mongoTestClient.db.setReturnValues(testRetValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, false, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {
count: 100,
data: {
locationone: 1000,
locationtwo: 1000,
awsbackend: 500,
azurebackend: 500,
gcpbackend: 500,
},
});
return done();
});
});
it('should return correct results, transient true', done => {
mongoTestClient.db.setReturnValues(testRetValues);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, true, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {
count: 100,
data: {
locationone: 500,
locationtwo: 500,
awsbackend: 500,
azurebackend: 500,
gcpbackend: 500,
},
});
return done();
});
});
const testRetValuesNeg = [[{
count: [{ _id: null, count: 100 }],
data: [
{ _id: 'locationone', bytes: 100 },
{ _id: 'locationtwo', bytes: 100 },
],
repData: [
{ _id: 'awsbackend', bytes: 500 },
{ _id: 'azurebackend', bytes: 500 },
{ _id: 'gcpbackend', bytes: 500 },
],
compData: [
{ _id: 'locationone', bytes: 500 },
{ _id: 'locationtwo', bytes: 500 },
],
}]];
it('should return clamp negative values to 0', done => {
mongoTestClient.db.setReturnValues(testRetValuesNeg);
const testCollection = mongoTestClient.db.collection('test');
mongoTestClient._handleMongo(testCollection, {}, true, log,
(err, res) => {
assert.ifError(err, `Expected success, but got error ${err}`);
assert.deepStrictEqual(res, {
count: 100,
data: {
locationone: 0,
locationtwo: 0,
awsbackend: 500,
azurebackend: 500,
gcpbackend: 500,
},
});
return done();
});
});
});
describe('MongoClientInterface, misc', () => { describe('MongoClientInterface, misc', () => {
let s3ConfigObj; let s3ConfigObj;

View File

@ -1,103 +0,0 @@
const testError = new Error('test error');
class DummyCollection {
constructor(name, isFail) {
this.s = {
name,
};
this.fail = isFail;
this.retQueue = [];
}
setReturnValues(retArray) {
this.retQueue.push(...retArray);
}
aggregate() {
return {
toArray: cb => {
if (this.retQueue.length <= 0) {
return cb(null, []);
}
const retVal = this.retQueue[0];
this.retQueue = this.retQueue.slice(1);
if (retVal instanceof Error) {
return cb(retVal);
}
return cb(null, retVal);
},
};
}
bulkWrite(cmds, opt, cb) {
process.stdout.write('mock mongodb.bulkWrite call\n');
if (this.fail) {
return cb(testError);
}
return cb();
}
update(target, doc, opt, cb) {
process.stdout.write('mock mongodb.update call\n');
if (this.fail) {
return cb(testError);
}
return cb();
}
find() {
return {
toArray: cb => {
if (this.retQueue.length <= 0) {
return cb(null, []);
}
const retVal = this.retQueue[0];
this.retQueue = this.retQueue.slice(1);
if (retVal instanceof Error) {
return cb(retVal);
}
return cb(null, retVal);
},
};
}
findOne(query, opt, cb) {
if (typeof opt === 'function' && cb === undefined) {
// eslint-disable-next-line no-param-reassign
cb = opt;
}
if (this.retQueue.length <= 0) {
return cb(null);
}
const retVal = this.retQueue[0];
this.retQueue = this.retQueue.slice(1);
if (retVal instanceof Error) {
return cb(retVal);
}
return cb(null, retVal);
}
}
class DummyMongoDB {
contructor() {
this.fail = false;
this.returnQueue = [];
}
reset() {
this.fail = false;
this.returnQueue = [];
}
setReturnValues(retValues) {
this.returnQueue.push(...retValues);
}
collection(name) {
const c = new DummyCollection(name, this.fail);
c.setReturnValues(this.returnQueue);
return c;
}
}
module.exports = DummyMongoDB;