Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Chan 5616139899 rf: use mongodb aggregate method for item count 2018-06-12 20:31:30 -07:00
1 changed files with 153 additions and 212 deletions

View File

@ -965,12 +965,11 @@ class MongoClientInterface {
if (err) { if (err) {
return next(errors.InternalError); return next(errors.InternalError);
} }
if (results.bucketList && results.counter && if (results.bucketList && results.dataManaged) {
results.counter.dataManaged) {
res.bucketList.push(results.bucketList); res.bucketList.push(results.bucketList);
res.objects += results.counter.objects; res.objects += results.objects;
res.versions += results.counter.versions; res.versions += results.versions;
consolidateData(results.counter.dataManaged); consolidateData(results.dataManaged);
} }
return next(); return next();
}); });
@ -986,166 +985,55 @@ class MongoClientInterface {
return undefined; return undefined;
} }
getDataManaged(c, query, log, cb) { _handleResults(res, isVersioned) {
const dataManaged = { const total = { curr: 0, prev: 0 };
total: 0, const locations = {};
locations: {},
};
c.find(query).toArray((err, entries) => {
if (err) {
log.error('error occured in mongo client', {
method: 'getDataManaged',
error: err,
});
return cb(err);
}
entries.forEach(entry => {
const { backends } = entry.value.replicationInfo;
const objMD = entry.value;
dataManaged.total += objMD['content-length'];
if (!dataManaged.locations[objMD.dataStoreName]) {
dataManaged.locations[objMD.dataStoreName] =
objMD['content-length'];
} else {
dataManaged.locations[objMD.dataStoreName] +=
objMD['content-length'];
}
// replication list
backends.forEach(location => {
const { site } = location;
if (!dataManaged.locations[site]) {
dataManaged.locations[site] = objMD['content-length'];
} else {
dataManaged.locations[site] += objMD['content-length'];
}
});
});
return cb(null, {
objCount: entries.length,
dataManaged,
});
});
}
getDataInfoNoVer(c, log, callback) { Object.keys(res.nullData).forEach(loc => {
return this.getDataManaged(c, {}, log, (err, res) => { const bytes = res.nullData[loc];
if (err) { const locName = loc === 'mem' || loc === 'file' ?
log.error('error occured in mongo client', { 'us-east-1' : loc;
method: 'getDataInfoNoVer', if (!locations[locName]) {
error: err, locations[locName] = { curr: 0, prev: 0 };
});
return callback(err);
} }
const dataManaged = { total.curr += bytes;
total: { curr: 0, prev: 0 }, locations[locName].curr += bytes;
locations: {},
};
const data = res.dataManaged.locations;
// add total
dataManaged.total.curr += res.dataManaged.total;
Object.keys(data).forEach(loc => {
if (!dataManaged.locations[loc]) {
dataManaged.locations[loc] = { prev: 0, curr: 0 };
}
dataManaged.locations[loc].curr = data[loc];
});
return callback(null, {
objects: res.objCount,
versions: 0,
dataManaged,
});
}); });
} if (isVersioned) {
Object.keys(res.versionData).forEach(loc => {
getDataInfoVer(c, mstVerIds, log, callback) { const bytes = res.versionData[loc];
// query for removing placeholder entries const locName = loc === 'mem' || loc === 'file' ?
/* eslint-disable quote-props */ 'us-east-1' : loc;
const queryFilter = { if (!locations[locName]) {
'$or': [ locations[locName] = { curr: 0, prev: 0 };
{ '_id': { '$regex': /\0.*$/g } }, }
{ 'value.isNull': { '$exists': true } }, total.prev += bytes;
{ locations[locName].prev += bytes;
'value.nullVersionId': { '$exists': false }, });
'value.versionId': { '$exists': false }, }
}, Object.keys(res.masterData).forEach(loc => {
], const bytes = res.masterData[loc];
}; const locName = loc === 'mem' || loc === 'file' ?
/* eslint-enable quote-props */ 'us-east-1' : loc;
return async.series({ if (!locations[locName]) {
master: done => { locations[locName] = { curr: 0, prev: 0 };
// query for getting only master version entries }
const mstFilter = []; total.curr += bytes;
/* eslint-disable quote-props */ locations[locName].curr += bytes;
mstVerIds.forEach(id => { if (isVersioned) {
if (id) { total.prev -= bytes;
mstFilter.push({ 'value.versionId': { '$eq': id } }); locations[locName].prev -= bytes;
} }
}); });
const query = { return {
'$and': [ versions: isVersioned ?
queryFilter, res.versionCount - res.masterCount : 0,
{ '$or': (mstFilter.length ? mstFilter : [{}]) }, objects: res.masterCount + res.nullCount,
], dataManaged: {
}; total,
/* eslint-enable quote-props */ locations,
return this.getDataManaged(c, query, log, done);
}, },
archived: done => { };
// query for getting only versioned entries
const mstFilter = [];
/* eslint-disable quote-props */
mstVerIds.forEach(id => {
if (id) {
mstFilter.push({ 'value.versionId': { '$ne': id } });
}
});
const query = {
'$and': [
queryFilter,
{ '$and': (mstFilter.length ? mstFilter : [{}]) },
],
};
/* eslint-enable quote-props */
return this.getDataManaged(c, query, log, done);
},
}, (err, res) => {
if (err) {
log.error('error occured in mongo client', {
method: 'getDataInfoVer',
error: err,
});
return callback(err);
}
const dataManaged = {
total: { curr: 0, prev: 0 },
locations: {},
};
const mstData = res.master.dataManaged.locations;
const verData = res.archived.dataManaged.locations;
// add total
dataManaged.total.curr += res.master.dataManaged.total;
dataManaged.total.prev += res.archived.dataManaged.total;
Object.keys(mstData).forEach(loc => {
if (!dataManaged.locations[loc]) {
dataManaged.locations[loc] = { prev: 0, curr: 0 };
}
dataManaged.locations[loc].curr = mstData[loc];
});
Object.keys(verData).forEach(loc => {
if (!dataManaged.locations[loc]) {
dataManaged.locations[loc] = { prev: 0, curr: 0 };
}
dataManaged.locations[loc].prev = verData[loc];
});
return callback(null, {
objects: res.master.objCount,
versions: res.archived.objCount,
dataManaged,
});
});
} }
getObjectMDStats(bucketName, bucketInfo, log, callback) { getObjectMDStats(bucketName, bucketInfo, log, callback) {
@ -1154,60 +1042,113 @@ class MongoClientInterface {
name: bucketName, name: bucketName,
location: bucketInfo.getLocationConstraint(), location: bucketInfo.getLocationConstraint(),
}; };
if (bucketInfo && bucketInfo._versioningConfiguration &&
(bucketInfo._versioningConfiguration.Status === 'Suspended' || const mstFilter = {
bucketInfo._versioningConfiguration.Status === 'Enabled')) { '_id': { $regex: /^[^\0]+$/ },
// if versioning is enabled 'value.versionId': { $exists: true },
c.distinct('_id').then(keys => { };
const trimmedKeys = keys.map(key => key.replace(/\0.*$/g, '')); const verFilter = { _id: { $regex: /\0/ } };
const uniqKeys = trimmedKeys.filter( const nullFilter = {
(key, index, self) => self.indexOf(key) === index); '_id': { $regex: /^[^\0]+$/ },
// for each uniqKey get master version id 'value.versionId': { $exists: false },
return async.map(uniqKeys, (key, done) => { };
this.getLatestVersion(c, key, log, (err, mst) => {
if (err) { const _handleCount = (err, entry, cb) => {
if (err.NoSuchKey) { if (err) {
log.debug('NoSuchKey master info', { return cb(err);
method: 'getObjectMDStats', }
error: err, return cb(null, entry && entry.count > 0 ? entry.count : 0);
}); };
return done(); const _handleEntries = (err, entries, cb) => {
} if (err) {
log.error('unable to retrieve master info', { return cb(err);
method: 'getObjectMDStats', }
error: err, const results = {};
}); if (entries) {
return done(err); entries.forEach(entry => {
} results[entry._id] = entry.bytes;
return done(null, mst.versionId);
});
}, (err, mstVerIds) => {
if (err) {
return callback(err);
}
return this.getDataInfoVer(c, mstVerIds, log,
(err, res) => {
if (err) {
return callback(err);
}
return callback(null, {
bucketList: retBucketInfo,
counter: res,
});
});
}); });
}).catch(callback); }
} else { return cb(null, results);
this.getDataInfoNoVer(c, log, (err, res) => { };
if (err) {
return callback(err); const _handleMongo = (cursor, cb) => async.parallel({
count: done => {
const tmpCursor = cursor.clone();
tmpCursor.project({ _id: 1 })
.group({
_id: null,
count: { $sum: 1 },
}).next((err, res) => _handleCount(err, res, done));
},
data: done => {
const tmpCursor = cursor.clone();
tmpCursor.project({
'value.dataStoreName': 1,
'value.content-length': 1,
}).group({
_id: '$value.dataStoreName',
bytes: { $sum: '$value.content-length' },
}).toArray((err, res) => _handleEntries(err, res, done));
},
repData: done => {
const tmpCursor = cursor.clone();
tmpCursor.project({
'value.replicationInfo.backends': 1,
'value.content-length': 1,
}).unwind('$value.replicationInfo.backends')
.group({
_id: '$value.replicationInfo.backends.site',
bytes: { $sum: '$value.content-length' },
}).toArray((err, res) => _handleEntries(err, res, done));
},
}, (err, res) => {
if (err) {
log.error('Error when processing mongo entries', {
method: '_handleMongo',
error: err,
});
return cb(err);
}
const retResult = {
count: res.count,
data: Object.assign({}, res.data),
};
Object.keys(res.repData).forEach(site => {
if (!retResult.data[site]) {
retResult.data[site] = 0;
} }
return callback(null, { retResult.data[site] += res.repData[site];
bucketList: retBucketInfo,
counter: res,
});
}); });
} return cb(null, retResult);
});
const mstCursor = c.aggregate().match(mstFilter);
const verCursor = c.aggregate().match(verFilter);
const nullCursor = c.aggregate().match(nullFilter);
async.parallel({
version: done => _handleMongo(verCursor, done),
null: done => _handleMongo(nullCursor, done),
master: done => _handleMongo(mstCursor, done),
}, (err, res) => {
if (err) {
return callback(err);
}
const resObj = {
masterCount: res.master.count || 0,
masterData: res.master.data || {},
nullCount: res.null.count || 0,
nullData: res.null.data || {},
versionCount: res.version.count || 0,
versionData: res.version.data || {},
};
const bucketStatus = bucketInfo.getVersioningConfiguration();
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
bucketStatus.Status === 'Suspended'));
const retResult = this._handleResults(resObj, isVer);
retResult.bucketList = retBucketInfo;
return callback(null, retResult);
});
} }
} }