Compare commits
7 Commits
developmen
...
test/mongo
Author | SHA1 | Date |
---|---|---|
philipyoo | 0b178b2d89 | |
philipyoo | d9cc1595a6 | |
philipyoo | 96124752f9 | |
philipyoo | b2ae1960db | |
philipyoo | 9a8ce7082d | |
philipyoo | d9b7e97f89 | |
philipyoo | 011537f8f0 |
|
@ -34,8 +34,10 @@ const METASTORE = '__metastore';
|
|||
const INFOSTORE = '__infostore';
|
||||
const __UUID = 'uuid';
|
||||
const PENSIEVE = 'PENSIEVE';
|
||||
const __COUNT_ITEMS = 'countitems';
|
||||
const ASYNC_REPAIR_TIMEOUT = 15000;
|
||||
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
||||
// const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
|
||||
// const itemScanRefreshDelay = 1000 * 60; // 1 minute
|
||||
const CONNECT_TIMEOUT_MS = 5000;
|
||||
|
||||
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
|
||||
|
@ -96,7 +98,8 @@ class MongoClientInterface {
|
|||
this.path = path;
|
||||
this.replicationGroupId = replicationGroupId;
|
||||
this.database = database;
|
||||
this.lastItemScanTime = null;
|
||||
// this.lastItemScanTime = null;
|
||||
this.itemScanInProgress = false;
|
||||
this.dataCount = new DataCounter();
|
||||
if (config && config instanceof EventEmitter) {
|
||||
this.config = config;
|
||||
|
@ -1035,9 +1038,398 @@ class MongoClientInterface {
|
|||
this.path : '/', cb);
|
||||
}
|
||||
|
||||
readCountItems(log, cb) {
|
||||
const i = this.getCollection(INFOSTORE);
|
||||
i.findOne({
|
||||
_id: __COUNT_ITEMS,
|
||||
}, {}, (err, doc) => {
|
||||
if (err) {
|
||||
log.error('readCountItems: error reading count items', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (!doc) {
|
||||
console.log('--> readCountItems NoSuchKey')
|
||||
// default
|
||||
const res = {
|
||||
objects: 0,
|
||||
versions: 0,
|
||||
buckets: 0,
|
||||
bucketList: [],
|
||||
dataManaged: {
|
||||
total: { curr: 0, prev: 0 },
|
||||
byLocation: {},
|
||||
},
|
||||
stalled: 0,
|
||||
};
|
||||
return cb(null, res);
|
||||
}
|
||||
return cb(null, doc.value);
|
||||
});
|
||||
}
|
||||
|
||||
updateCountItems(value, log, cb) {
|
||||
const i = this.getCollection(INFOSTORE);
|
||||
i.update({
|
||||
_id: __COUNT_ITEMS,
|
||||
}, {
|
||||
_id: __COUNT_ITEMS,
|
||||
value,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, err => {
|
||||
if (err) {
|
||||
log.error('updateCountItems: error updating count items', {
|
||||
error: err.message,
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
||||
// _scanAllTheThings(bucketList, bucketCount, log) {
|
||||
// console.log('--> start _scanAllTheThings')
|
||||
// this.itemScanInProgress = true;
|
||||
// const res = {
|
||||
// objects: 0,
|
||||
// versions: 0,
|
||||
// buckets: bucketCount,
|
||||
// bucketList,
|
||||
// dataManaged: {
|
||||
// total: { curr: 0, prev: 0 },
|
||||
// byLocation: {},
|
||||
// },
|
||||
// stalled: 0,
|
||||
// };
|
||||
//
|
||||
// const consolidateData = dataManaged => {
|
||||
// if (dataManaged && dataManaged.locations && dataManaged.total) {
|
||||
// const locations = dataManaged.locations;
|
||||
// res.dataManaged.total.curr += dataManaged.total.curr;
|
||||
// res.dataManaged.total.prev += dataManaged.total.prev;
|
||||
// Object.keys(locations).forEach(site => {
|
||||
// if (!res.dataManaged.byLocation[site]) {
|
||||
// res.dataManaged.byLocation[site] =
|
||||
// Object.assign({}, locations[site]);
|
||||
// } else {
|
||||
// res.dataManaged.byLocation[site].curr +=
|
||||
// locations[site].curr;
|
||||
// res.dataManaged.byLocation[site].prev +=
|
||||
// locations[site].prev;
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
// };
|
||||
//
|
||||
// async.eachLimit(bucketList, 10, (bucketInfo, next) => {
|
||||
// const bucketName = bucketInfo.getName();
|
||||
//
|
||||
// return this.getObjectMDStats(bucketName, bucketInfo, log,
|
||||
// (err, results) => {
|
||||
// if (err) {
|
||||
// return next(errors.InternalError);
|
||||
// }
|
||||
// if (results.dataManaged) {
|
||||
// res.objects += results.objects;
|
||||
// res.versions += results.versions;
|
||||
// res.stalled += results.stalled;
|
||||
// consolidateData(results.dataManaged);
|
||||
// }
|
||||
// return next();
|
||||
// });
|
||||
// }, err => {
|
||||
// console.log('--> scan almost done')
|
||||
// if (err) {
|
||||
// log.error('error occurred scanning for count items');
|
||||
// this.itemScanInProgress = false;
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// this.updateCountItems(res, log, err => {
|
||||
// if (err) {
|
||||
// return;
|
||||
// }
|
||||
// this.itemScanInProgress = false;
|
||||
// });
|
||||
//
|
||||
// // 1. update __infostore
|
||||
// // - if err, scan failed
|
||||
// // 2. update internal flags
|
||||
// });
|
||||
// }
|
||||
|
||||
// countItemsOLD(log, cb) {
|
||||
// // once we queue everything, we perform full scan
|
||||
//
|
||||
// let bucketCount = 0;
|
||||
// const bucketInfos = [];
|
||||
//
|
||||
// this.db.listCollections().toArray((err, collInfos) => {
|
||||
// // read infostore
|
||||
// // async.mapLimit
|
||||
// async.mapLimit(collInfos, 10, (value, next) => {
|
||||
// if (value.name === METASTORE ||
|
||||
// value.name === INFOSTORE ||
|
||||
// value.name === USERSBUCKET ||
|
||||
// value.name === PENSIEVE ||
|
||||
// value.name.startsWith(constants.mpuBucketPrefix)
|
||||
// ) {
|
||||
// // skip
|
||||
// return next();
|
||||
// }
|
||||
// bucketCount++;
|
||||
// const bucketName = value.name;
|
||||
// // FIXME: there is currently no way of distinguishing
|
||||
// // master from versions and searching for VID_SEP
|
||||
// // does not work because there cannot be null bytes
|
||||
// // in $regex
|
||||
//
|
||||
// return this.getBucketAttributes(bucketName, log,
|
||||
// (err, bucketInfo) => {
|
||||
// if (err) {
|
||||
// log.error('error occured in countItems', {
|
||||
// method: 'countItems',
|
||||
// error: err,
|
||||
// });
|
||||
// return next(errors.InternalError);
|
||||
// }
|
||||
// bucketInfos.push(bucketInfo);
|
||||
// const retBucketInfo = {
|
||||
// name: bucketName,
|
||||
// location: bucketInfo.getLocationConstraint(),
|
||||
// isVersioned:
|
||||
// !!bucketInfo.getVersioningConfiguration(),
|
||||
// ownerCanonicalId: bucketInfo.getOwner(),
|
||||
// ingestion: bucketInfo.isIngestionBucket(),
|
||||
// };
|
||||
// return next(null, retBucketInfo);
|
||||
// });
|
||||
// }, (err, list) => {
|
||||
// if (err) {
|
||||
// return cb(err);
|
||||
// }
|
||||
//
|
||||
// // remove undefined from list (i.e. METASTORE, USERSBUCKET)
|
||||
// const retBucketInfoList = list.reduce((store, item) => {
|
||||
// if (item !== undefined) {
|
||||
// store.push(item);
|
||||
// }
|
||||
// return store;
|
||||
// }, []);
|
||||
//
|
||||
// // TODO: check also this._scanInProgress
|
||||
// const doFullScan = ((this.lastItemScanTime === null ||
|
||||
// (Date.now() - this.lastItemScanTime) > itemScanRefreshDelay)
|
||||
// && this.itemScanInProgress === false);
|
||||
//
|
||||
// return this.readCountItems(log, (err, result) => {
|
||||
// if (err) {
|
||||
// return cb(err);
|
||||
// }
|
||||
// // overwrite bucket info since we have latest
|
||||
// /* eslint-disable */
|
||||
// result.bucketList = retBucketInfoList;
|
||||
// result.buckets = bucketCount;
|
||||
// /* eslint-enable */
|
||||
//
|
||||
// if (doFullScan) {
|
||||
// this._scanAllTheThings(retBucketInfoList, bucketCount,
|
||||
// log);
|
||||
// this.lastItemScanTime = Date.now();
|
||||
// }
|
||||
//
|
||||
// console.log(results)
|
||||
//
|
||||
// return cb(null, result);
|
||||
// });
|
||||
// });
|
||||
// });
|
||||
// return undefined;
|
||||
// }
|
||||
|
||||
_getBucketInfos(log, cb) {
|
||||
let bucketCount = 0;
|
||||
const bucketInfos = [];
|
||||
|
||||
console.log('start _getBucketInfos')
|
||||
|
||||
this.db.listCollections().toArray((err, collInfos) => {
|
||||
if (err) {
|
||||
log.error('could not get list of collections', {
|
||||
method: '_getBucketInfos',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return async.eachLimit(collInfos, 10, (value, next) => {
|
||||
if (value.name === METASTORE ||
|
||||
value.name === INFOSTORE ||
|
||||
value.name === USERSBUCKET ||
|
||||
value.name === PENSIEVE ||
|
||||
value.name.startsWith(constants.mpuBucketPrefix)
|
||||
) {
|
||||
// skip
|
||||
return next();
|
||||
}
|
||||
bucketCount++;
|
||||
const bucketName = value.name;
|
||||
// FIXME: there is currently no way of distinguishing
|
||||
// master from versions and searching for VID_SEP
|
||||
// does not work because there cannot be null bytes
|
||||
// in $regex
|
||||
|
||||
return this.getBucketAttributes(bucketName, log,
|
||||
(err, bucketInfo) => {
|
||||
if (err) {
|
||||
log.error('error occured in countItems', {
|
||||
method: '_getBucketInfos',
|
||||
error: err,
|
||||
});
|
||||
return next(errors.InternalError);
|
||||
}
|
||||
bucketInfos.push(bucketInfo);
|
||||
return next();
|
||||
});
|
||||
}, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, {
|
||||
bucketCount,
|
||||
bucketInfos,
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
countItems(log, cb) {
|
||||
const doFullScan = this.lastItemScanTime === null ||
|
||||
(Date.now() - this.lastItemScanTime) > itemScanRefreshDelay;
|
||||
console.log('start count items')
|
||||
this._getBucketInfos(log, (err, res) => {
|
||||
console.log(err)
|
||||
console.log(res)
|
||||
if (err) {
|
||||
log.error('error getting bucket info', {
|
||||
method: 'countItems',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
const { bucketCount, bucketInfos } = res;
|
||||
|
||||
const retBucketInfos = bucketInfos.map(bucket => ({
|
||||
name: bucket.getName(),
|
||||
location: bucket.getLocationConstraint(),
|
||||
isVersioned: !!bucket.getVersioningConfiguration(),
|
||||
ownerCanonicalId: bucket.getOwner(),
|
||||
ingestion: bucket.isIngestionBucket(),
|
||||
}));
|
||||
|
||||
// const retBucketInfo = {
|
||||
// name: bucketName,
|
||||
// location: bucketInfo.getLocationConstraint(),
|
||||
// isVersioned:
|
||||
// !!bucketInfo.getVersioningConfiguration(),
|
||||
// ownerCanonicalId: bucketInfo.getOwner(),
|
||||
// ingestion: bucketInfo.isIngestionBucket(),
|
||||
// };
|
||||
// bucketList.push(retBucketInfo);
|
||||
|
||||
return this.readCountItems(log, (err, results) => {
|
||||
console.log('=====')
|
||||
console.log(err)
|
||||
console.log(results)
|
||||
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
// overwrite bucket info since we have latest
|
||||
/* eslint-disable */
|
||||
results.bucketList = retBucketInfos;
|
||||
results.buckets = bucketCount;
|
||||
/* eslint-enable */
|
||||
|
||||
console.log(results);
|
||||
|
||||
return cb(null, results);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
|
||||
// let bucketCount = 0;
|
||||
// const bucketList = [];
|
||||
//
|
||||
// this.db.listCollections().toArray((err, collInfos) => {
|
||||
// async.eachLimit(collInfos, 10, (value, next) => {
|
||||
// if (value.name === METASTORE ||
|
||||
// value.name === INFOSTORE ||
|
||||
// value.name === USERSBUCKET ||
|
||||
// value.name === PENSIEVE ||
|
||||
// value.name.startsWith(constants.mpuBucketPrefix)
|
||||
// ) {
|
||||
// // skip
|
||||
// return next();
|
||||
// }
|
||||
// bucketCount++;
|
||||
// const bucketName = value.name;
|
||||
// // FIXME: there is currently no way of distinguishing
|
||||
// // master from versions and searching for VID_SEP
|
||||
// // does not work because there cannot be null bytes
|
||||
// // in $regex
|
||||
//
|
||||
// return this.getBucketAttributes(bucketName, log,
|
||||
// (err, bucketInfo) => {
|
||||
// if (err) {
|
||||
// log.error('error occured in countItems', {
|
||||
// method: 'countItems',
|
||||
// error: err,
|
||||
// });
|
||||
// return next(errors.InternalError);
|
||||
// }
|
||||
// const retBucketInfo = {
|
||||
// name: bucketName,
|
||||
// location: bucketInfo.getLocationConstraint(),
|
||||
// isVersioned:
|
||||
// !!bucketInfo.getVersioningConfiguration(),
|
||||
// ownerCanonicalId: bucketInfo.getOwner(),
|
||||
// ingestion: bucketInfo.isIngestionBucket(),
|
||||
// };
|
||||
// bucketList.push(retBucketInfo);
|
||||
// return next(null, retBucketInfo);
|
||||
// });
|
||||
// }, err => {
|
||||
// if (err) {
|
||||
// return cb(err);
|
||||
// }
|
||||
//
|
||||
// return this.readCountItems(log, (err, results) => {
|
||||
// if (err) {
|
||||
// return cb(err);
|
||||
// }
|
||||
// // overwrite bucket info since we have latest
|
||||
// /* eslint-disable */
|
||||
// results.bucketList = bucketList;
|
||||
// results.buckets = bucketCount;
|
||||
// /* eslint-enable */
|
||||
//
|
||||
// console.log(results);
|
||||
//
|
||||
// return cb(null, results);
|
||||
// });
|
||||
// });
|
||||
// });
|
||||
// return undefined;
|
||||
}
|
||||
|
||||
scanItemCount(log, cb) {
|
||||
if (this.itemScanInProgress) {
|
||||
log.debug('scan is already in progress');
|
||||
return cb(errors.OperationAborted);
|
||||
}
|
||||
|
||||
this.itemScanInProgress = true;
|
||||
|
||||
const res = {
|
||||
objects: 0,
|
||||
|
@ -1092,8 +1484,8 @@ class MongoClientInterface {
|
|||
next => this.getBucketAttributes(
|
||||
bucketName, log, (err, bucketInfo) => {
|
||||
if (err) {
|
||||
log.error('error occured in countItems', {
|
||||
method: 'countItems',
|
||||
log.error('error getting bucket attributes', {
|
||||
method: 'scanItemCount',
|
||||
error: err,
|
||||
});
|
||||
return next(errors.InternalError);
|
||||
|
@ -1109,13 +1501,15 @@ class MongoClientInterface {
|
|||
res.bucketList.push(retBucketInfo);
|
||||
return next(null, bucketInfo);
|
||||
}),
|
||||
(bucketInfo, next) => {
|
||||
if (!doFullScan) {
|
||||
return next(null, {});
|
||||
(bucketInfo, next) => this._getIsTransient(bucketInfo, log,
|
||||
(err, isTransient) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
return this.getObjectMDStats(
|
||||
bucketName, bucketInfo, log, next);
|
||||
},
|
||||
return next(null, bucketInfo, isTransient);
|
||||
}),
|
||||
(bucketInfo, isTransient, next) => this.getObjectMDStats(
|
||||
bucketName, bucketInfo, isTransient, log, next),
|
||||
], (err, results) => {
|
||||
if (err) {
|
||||
return next(errors.InternalError);
|
||||
|
@ -1133,21 +1527,48 @@ class MongoClientInterface {
|
|||
return cb(err);
|
||||
}
|
||||
|
||||
if (!doFullScan) {
|
||||
const cachedRes = this.dataCount.results();
|
||||
cachedRes.bucketList = res.bucketList;
|
||||
cachedRes.buckets = res.buckets;
|
||||
return cb(null, cachedRes);
|
||||
// save to infostore
|
||||
return this.updateCountItems(res, log, err => {
|
||||
this.itemScanInProgress = false;
|
||||
if (err) {
|
||||
log.error('error saving count items in mongo', {
|
||||
method: 'scanItemCount',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
|
||||
this.lastItemScanTime = Date.now();
|
||||
this.dataCount.set(res);
|
||||
return cb(null, res);
|
||||
});
|
||||
});
|
||||
});
|
||||
return undefined;
|
||||
}
|
||||
|
||||
_getIsTransient(bucketInfo, log, cb) {
|
||||
const overlayVersionId = 'configuration/overlay-version';
|
||||
|
||||
async.waterfall([
|
||||
next => this.getObject(PENSIEVE, overlayVersionId, {}, log, next),
|
||||
(version, next) => {
|
||||
const overlayConfigId = `configuration/overlay/${version}`;
|
||||
return this.getObject(PENSIEVE, overlayConfigId, {}, log, next);
|
||||
},
|
||||
], (err, res) => {
|
||||
if (err) {
|
||||
log.error('error getting configuration overlay', {
|
||||
method: '_getIsTransient',
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
const locConstraint = bucketInfo.getLocationConstraint();
|
||||
const isTransient =
|
||||
Boolean(res.locations[locConstraint].isTransient);
|
||||
|
||||
return cb(null, isTransient);
|
||||
});
|
||||
}
|
||||
|
||||
_getLocName(loc) {
|
||||
return loc === 'mem' || loc === 'file' ? 'us-east-1' : loc;
|
||||
}
|
||||
|
@ -1367,14 +1788,14 @@ class MongoClientInterface {
|
|||
});
|
||||
}
|
||||
|
||||
getObjectMDStats(bucketName, bucketInfo, log, callback) {
|
||||
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
|
||||
const c = this.getCollection(bucketName);
|
||||
let isTransient;
|
||||
if (this.config) {
|
||||
isTransient = this.config
|
||||
.getLocationConstraint(bucketInfo.getLocationConstraint())
|
||||
.isTransient;
|
||||
}
|
||||
// TODO: base off PENSIEVE
|
||||
// if (this.config) {
|
||||
// isTransient = this.config
|
||||
// .getLocationConstraint(bucketInfo.getLocationConstraint())
|
||||
// .isTransient;
|
||||
// }
|
||||
const mstFilter = {
|
||||
'_id': { $regex: /^[^\0]+$/ },
|
||||
'value.versionId': { $exists: true },
|
||||
|
|
Loading…
Reference in New Issue