Compare commits
1 Commits
developmen
...
refactor/m
Author | SHA1 | Date |
---|---|---|
Alexander Chan | 5616139899 |
|
@ -965,12 +965,11 @@ class MongoClientInterface {
|
|||
if (err) {
|
||||
return next(errors.InternalError);
|
||||
}
|
||||
if (results.bucketList && results.counter &&
|
||||
results.counter.dataManaged) {
|
||||
if (results.bucketList && results.dataManaged) {
|
||||
res.bucketList.push(results.bucketList);
|
||||
res.objects += results.counter.objects;
|
||||
res.versions += results.counter.versions;
|
||||
consolidateData(results.counter.dataManaged);
|
||||
res.objects += results.objects;
|
||||
res.versions += results.versions;
|
||||
consolidateData(results.dataManaged);
|
||||
}
|
||||
return next();
|
||||
});
|
||||
|
@ -986,166 +985,55 @@ class MongoClientInterface {
|
|||
return undefined;
|
||||
}
|
||||
|
||||
getDataManaged(c, query, log, cb) {
|
||||
const dataManaged = {
|
||||
total: 0,
|
||||
locations: {},
|
||||
};
|
||||
c.find(query).toArray((err, entries) => {
|
||||
if (err) {
|
||||
log.error('error occured in mongo client', {
|
||||
method: 'getDataManaged',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
entries.forEach(entry => {
|
||||
const { backends } = entry.value.replicationInfo;
|
||||
const objMD = entry.value;
|
||||
dataManaged.total += objMD['content-length'];
|
||||
if (!dataManaged.locations[objMD.dataStoreName]) {
|
||||
dataManaged.locations[objMD.dataStoreName] =
|
||||
objMD['content-length'];
|
||||
} else {
|
||||
dataManaged.locations[objMD.dataStoreName] +=
|
||||
objMD['content-length'];
|
||||
}
|
||||
// replication list
|
||||
backends.forEach(location => {
|
||||
const { site } = location;
|
||||
if (!dataManaged.locations[site]) {
|
||||
dataManaged.locations[site] = objMD['content-length'];
|
||||
} else {
|
||||
dataManaged.locations[site] += objMD['content-length'];
|
||||
}
|
||||
});
|
||||
});
|
||||
return cb(null, {
|
||||
objCount: entries.length,
|
||||
dataManaged,
|
||||
});
|
||||
});
|
||||
}
|
||||
_handleResults(res, isVersioned) {
|
||||
const total = { curr: 0, prev: 0 };
|
||||
const locations = {};
|
||||
|
||||
getDataInfoNoVer(c, log, callback) {
|
||||
return this.getDataManaged(c, {}, log, (err, res) => {
|
||||
if (err) {
|
||||
log.error('error occured in mongo client', {
|
||||
method: 'getDataInfoNoVer',
|
||||
error: err,
|
||||
});
|
||||
return callback(err);
|
||||
Object.keys(res.nullData).forEach(loc => {
|
||||
const bytes = res.nullData[loc];
|
||||
const locName = loc === 'mem' || loc === 'file' ?
|
||||
'us-east-1' : loc;
|
||||
if (!locations[locName]) {
|
||||
locations[locName] = { curr: 0, prev: 0 };
|
||||
}
|
||||
const dataManaged = {
|
||||
total: { curr: 0, prev: 0 },
|
||||
locations: {},
|
||||
};
|
||||
const data = res.dataManaged.locations;
|
||||
// add total
|
||||
dataManaged.total.curr += res.dataManaged.total;
|
||||
Object.keys(data).forEach(loc => {
|
||||
if (!dataManaged.locations[loc]) {
|
||||
dataManaged.locations[loc] = { prev: 0, curr: 0 };
|
||||
total.curr += bytes;
|
||||
locations[locName].curr += bytes;
|
||||
});
|
||||
if (isVersioned) {
|
||||
Object.keys(res.versionData).forEach(loc => {
|
||||
const bytes = res.versionData[loc];
|
||||
const locName = loc === 'mem' || loc === 'file' ?
|
||||
'us-east-1' : loc;
|
||||
if (!locations[locName]) {
|
||||
locations[locName] = { curr: 0, prev: 0 };
|
||||
}
|
||||
dataManaged.locations[loc].curr = data[loc];
|
||||
});
|
||||
return callback(null, {
|
||||
objects: res.objCount,
|
||||
versions: 0,
|
||||
dataManaged,
|
||||
});
|
||||
total.prev += bytes;
|
||||
locations[locName].prev += bytes;
|
||||
});
|
||||
}
|
||||
|
||||
getDataInfoVer(c, mstVerIds, log, callback) {
|
||||
// query for removing placeholder entries
|
||||
/* eslint-disable quote-props */
|
||||
const queryFilter = {
|
||||
'$or': [
|
||||
{ '_id': { '$regex': /\0.*$/g } },
|
||||
{ 'value.isNull': { '$exists': true } },
|
||||
{
|
||||
'value.nullVersionId': { '$exists': false },
|
||||
'value.versionId': { '$exists': false },
|
||||
Object.keys(res.masterData).forEach(loc => {
|
||||
const bytes = res.masterData[loc];
|
||||
const locName = loc === 'mem' || loc === 'file' ?
|
||||
'us-east-1' : loc;
|
||||
if (!locations[locName]) {
|
||||
locations[locName] = { curr: 0, prev: 0 };
|
||||
}
|
||||
total.curr += bytes;
|
||||
locations[locName].curr += bytes;
|
||||
if (isVersioned) {
|
||||
total.prev -= bytes;
|
||||
locations[locName].prev -= bytes;
|
||||
}
|
||||
});
|
||||
return {
|
||||
versions: isVersioned ?
|
||||
res.versionCount - res.masterCount : 0,
|
||||
objects: res.masterCount + res.nullCount,
|
||||
dataManaged: {
|
||||
total,
|
||||
locations,
|
||||
},
|
||||
],
|
||||
};
|
||||
/* eslint-enable quote-props */
|
||||
return async.series({
|
||||
master: done => {
|
||||
// query for getting only master version entries
|
||||
const mstFilter = [];
|
||||
/* eslint-disable quote-props */
|
||||
mstVerIds.forEach(id => {
|
||||
if (id) {
|
||||
mstFilter.push({ 'value.versionId': { '$eq': id } });
|
||||
}
|
||||
});
|
||||
const query = {
|
||||
'$and': [
|
||||
queryFilter,
|
||||
{ '$or': (mstFilter.length ? mstFilter : [{}]) },
|
||||
],
|
||||
};
|
||||
/* eslint-enable quote-props */
|
||||
return this.getDataManaged(c, query, log, done);
|
||||
},
|
||||
archived: done => {
|
||||
// query for getting only versioned entries
|
||||
const mstFilter = [];
|
||||
/* eslint-disable quote-props */
|
||||
mstVerIds.forEach(id => {
|
||||
if (id) {
|
||||
mstFilter.push({ 'value.versionId': { '$ne': id } });
|
||||
}
|
||||
});
|
||||
const query = {
|
||||
'$and': [
|
||||
queryFilter,
|
||||
{ '$and': (mstFilter.length ? mstFilter : [{}]) },
|
||||
],
|
||||
};
|
||||
/* eslint-enable quote-props */
|
||||
return this.getDataManaged(c, query, log, done);
|
||||
},
|
||||
}, (err, res) => {
|
||||
if (err) {
|
||||
log.error('error occured in mongo client', {
|
||||
method: 'getDataInfoVer',
|
||||
error: err,
|
||||
});
|
||||
return callback(err);
|
||||
}
|
||||
const dataManaged = {
|
||||
total: { curr: 0, prev: 0 },
|
||||
locations: {},
|
||||
};
|
||||
const mstData = res.master.dataManaged.locations;
|
||||
const verData = res.archived.dataManaged.locations;
|
||||
|
||||
// add total
|
||||
dataManaged.total.curr += res.master.dataManaged.total;
|
||||
dataManaged.total.prev += res.archived.dataManaged.total;
|
||||
|
||||
Object.keys(mstData).forEach(loc => {
|
||||
if (!dataManaged.locations[loc]) {
|
||||
dataManaged.locations[loc] = { prev: 0, curr: 0 };
|
||||
}
|
||||
dataManaged.locations[loc].curr = mstData[loc];
|
||||
});
|
||||
Object.keys(verData).forEach(loc => {
|
||||
if (!dataManaged.locations[loc]) {
|
||||
dataManaged.locations[loc] = { prev: 0, curr: 0 };
|
||||
}
|
||||
dataManaged.locations[loc].prev = verData[loc];
|
||||
});
|
||||
|
||||
return callback(null, {
|
||||
objects: res.master.objCount,
|
||||
versions: res.archived.objCount,
|
||||
dataManaged,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
getObjectMDStats(bucketName, bucketInfo, log, callback) {
|
||||
|
@ -1154,60 +1042,113 @@ class MongoClientInterface {
|
|||
name: bucketName,
|
||||
location: bucketInfo.getLocationConstraint(),
|
||||
};
|
||||
if (bucketInfo && bucketInfo._versioningConfiguration &&
|
||||
(bucketInfo._versioningConfiguration.Status === 'Suspended' ||
|
||||
bucketInfo._versioningConfiguration.Status === 'Enabled')) {
|
||||
// if versioning is enabled
|
||||
c.distinct('_id').then(keys => {
|
||||
const trimmedKeys = keys.map(key => key.replace(/\0.*$/g, ''));
|
||||
const uniqKeys = trimmedKeys.filter(
|
||||
(key, index, self) => self.indexOf(key) === index);
|
||||
// for each uniqKey get master version id
|
||||
return async.map(uniqKeys, (key, done) => {
|
||||
this.getLatestVersion(c, key, log, (err, mst) => {
|
||||
|
||||
const mstFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: true },
|
||||
};
|
||||
const verFilter = { _id: { $regex: /\0/ } };
|
||||
const nullFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: false },
|
||||
};
|
||||
|
||||
const _handleCount = (err, entry, cb) => {
|
||||
if (err) {
|
||||
if (err.NoSuchKey) {
|
||||
log.debug('NoSuchKey master info', {
|
||||
method: 'getObjectMDStats',
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, entry && entry.count > 0 ? entry.count : 0);
|
||||
};
|
||||
const _handleEntries = (err, entries, cb) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
const results = {};
|
||||
if (entries) {
|
||||
entries.forEach(entry => {
|
||||
results[entry._id] = entry.bytes;
|
||||
});
|
||||
}
|
||||
return cb(null, results);
|
||||
};
|
||||
|
||||
const _handleMongo = (cursor, cb) => async.parallel({
|
||||
count: done => {
|
||||
const tmpCursor = cursor.clone();
|
||||
tmpCursor.project({ _id: 1 })
|
||||
.group({
|
||||
_id: null,
|
||||
count: { $sum: 1 },
|
||||
}).next((err, res) => _handleCount(err, res, done));
|
||||
},
|
||||
data: done => {
|
||||
const tmpCursor = cursor.clone();
|
||||
tmpCursor.project({
|
||||
'value.dataStoreName': 1,
|
||||
'value.content-length': 1,
|
||||
}).group({
|
||||
_id: '$value.dataStoreName',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
}).toArray((err, res) => _handleEntries(err, res, done));
|
||||
},
|
||||
repData: done => {
|
||||
const tmpCursor = cursor.clone();
|
||||
tmpCursor.project({
|
||||
'value.replicationInfo.backends': 1,
|
||||
'value.content-length': 1,
|
||||
}).unwind('$value.replicationInfo.backends')
|
||||
.group({
|
||||
_id: '$value.replicationInfo.backends.site',
|
||||
bytes: { $sum: '$value.content-length' },
|
||||
}).toArray((err, res) => _handleEntries(err, res, done));
|
||||
},
|
||||
}, (err, res) => {
|
||||
if (err) {
|
||||
log.error('Error when processing mongo entries', {
|
||||
method: '_handleMongo',
|
||||
error: err,
|
||||
});
|
||||
return done();
|
||||
return cb(err);
|
||||
}
|
||||
log.error('unable to retrieve master info', {
|
||||
method: 'getObjectMDStats',
|
||||
error: err,
|
||||
});
|
||||
return done(err);
|
||||
const retResult = {
|
||||
count: res.count,
|
||||
data: Object.assign({}, res.data),
|
||||
};
|
||||
Object.keys(res.repData).forEach(site => {
|
||||
if (!retResult.data[site]) {
|
||||
retResult.data[site] = 0;
|
||||
}
|
||||
return done(null, mst.versionId);
|
||||
retResult.data[site] += res.repData[site];
|
||||
});
|
||||
}, (err, mstVerIds) => {
|
||||
return cb(null, retResult);
|
||||
});
|
||||
|
||||
const mstCursor = c.aggregate().match(mstFilter);
|
||||
const verCursor = c.aggregate().match(verFilter);
|
||||
const nullCursor = c.aggregate().match(nullFilter);
|
||||
async.parallel({
|
||||
version: done => _handleMongo(verCursor, done),
|
||||
null: done => _handleMongo(nullCursor, done),
|
||||
master: done => _handleMongo(mstCursor, done),
|
||||
}, (err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
return this.getDataInfoVer(c, mstVerIds, log,
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
return callback(null, {
|
||||
bucketList: retBucketInfo,
|
||||
counter: res,
|
||||
const resObj = {
|
||||
masterCount: res.master.count || 0,
|
||||
masterData: res.master.data || {},
|
||||
nullCount: res.null.count || 0,
|
||||
nullData: res.null.data || {},
|
||||
versionCount: res.version.count || 0,
|
||||
versionData: res.version.data || {},
|
||||
};
|
||||
const bucketStatus = bucketInfo.getVersioningConfiguration();
|
||||
const isVer = (bucketStatus && (bucketStatus.Status === 'Enabled' ||
|
||||
bucketStatus.Status === 'Suspended'));
|
||||
const retResult = this._handleResults(resObj, isVer);
|
||||
retResult.bucketList = retBucketInfo;
|
||||
return callback(null, retResult);
|
||||
});
|
||||
});
|
||||
});
|
||||
}).catch(callback);
|
||||
} else {
|
||||
this.getDataInfoNoVer(c, log, (err, res) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
return callback(null, {
|
||||
bucketList: retBucketInfo,
|
||||
counter: res,
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue