Compare commits

...

7 Commits

Author SHA1 Message Date
philipyoo 0b178b2d89 fix and console.logs 2019-06-13 15:23:32 -07:00
philipyoo d9cc1595a6 update 2019-06-13 13:42:40 -07:00
philipyoo 96124752f9 update 2019-06-13 13:35:11 -07:00
philipyoo b2ae1960db fix NoSuchKey 2019-06-13 10:42:49 -07:00
philipyoo 9a8ce7082d update logs 2019-06-13 10:22:28 -07:00
philipyoo d9b7e97f89 test 2019-06-13 10:21:04 -07:00
philipyoo 011537f8f0 initial 2019-06-13 10:19:14 -07:00
1 changed files with 451 additions and 30 deletions

View File

@ -34,8 +34,10 @@ const METASTORE = '__metastore';
const INFOSTORE = '__infostore';
const __UUID = 'uuid';
const PENSIEVE = 'PENSIEVE';
const __COUNT_ITEMS = 'countitems';
const ASYNC_REPAIR_TIMEOUT = 15000;
const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
// const itemScanRefreshDelay = 1000 * 60 * 60; // 1 hour
// const itemScanRefreshDelay = 1000 * 60; // 1 minute
const CONNECT_TIMEOUT_MS = 5000;
const initialInstanceID = process.env.INITIAL_INSTANCE_ID;
@ -96,7 +98,8 @@ class MongoClientInterface {
this.path = path;
this.replicationGroupId = replicationGroupId;
this.database = database;
this.lastItemScanTime = null;
// this.lastItemScanTime = null;
this.itemScanInProgress = false;
this.dataCount = new DataCounter();
if (config && config instanceof EventEmitter) {
this.config = config;
@ -1035,9 +1038,398 @@ class MongoClientInterface {
this.path : '/', cb);
}
readCountItems(log, cb) {
const i = this.getCollection(INFOSTORE);
i.findOne({
_id: __COUNT_ITEMS,
}, {}, (err, doc) => {
if (err) {
log.error('readCountItems: error reading count items', {
error: err.message,
});
return cb(errors.InternalError);
}
if (!doc) {
console.log('--> readCountItems NoSuchKey')
// default
const res = {
objects: 0,
versions: 0,
buckets: 0,
bucketList: [],
dataManaged: {
total: { curr: 0, prev: 0 },
byLocation: {},
},
stalled: 0,
};
return cb(null, res);
}
return cb(null, doc.value);
});
}
updateCountItems(value, log, cb) {
const i = this.getCollection(INFOSTORE);
i.update({
_id: __COUNT_ITEMS,
}, {
_id: __COUNT_ITEMS,
value,
}, {
upsert: true,
}, err => {
if (err) {
log.error('updateCountItems: error updating count items', {
error: err.message,
});
return cb(errors.InternalError);
}
return cb();
});
}
// _scanAllTheThings(bucketList, bucketCount, log) {
// console.log('--> start _scanAllTheThings')
// this.itemScanInProgress = true;
// const res = {
// objects: 0,
// versions: 0,
// buckets: bucketCount,
// bucketList,
// dataManaged: {
// total: { curr: 0, prev: 0 },
// byLocation: {},
// },
// stalled: 0,
// };
//
// const consolidateData = dataManaged => {
// if (dataManaged && dataManaged.locations && dataManaged.total) {
// const locations = dataManaged.locations;
// res.dataManaged.total.curr += dataManaged.total.curr;
// res.dataManaged.total.prev += dataManaged.total.prev;
// Object.keys(locations).forEach(site => {
// if (!res.dataManaged.byLocation[site]) {
// res.dataManaged.byLocation[site] =
// Object.assign({}, locations[site]);
// } else {
// res.dataManaged.byLocation[site].curr +=
// locations[site].curr;
// res.dataManaged.byLocation[site].prev +=
// locations[site].prev;
// }
// });
// }
// };
//
// async.eachLimit(bucketList, 10, (bucketInfo, next) => {
// const bucketName = bucketInfo.getName();
//
// return this.getObjectMDStats(bucketName, bucketInfo, log,
// (err, results) => {
// if (err) {
// return next(errors.InternalError);
// }
// if (results.dataManaged) {
// res.objects += results.objects;
// res.versions += results.versions;
// res.stalled += results.stalled;
// consolidateData(results.dataManaged);
// }
// return next();
// });
// }, err => {
// console.log('--> scan almost done')
// if (err) {
// log.error('error occurred scanning for count items');
// this.itemScanInProgress = false;
// return;
// }
//
// this.updateCountItems(res, log, err => {
// if (err) {
// return;
// }
// this.itemScanInProgress = false;
// });
//
// // 1. update __infostore
// // - if err, scan failed
// // 2. update internal flags
// });
// }
// countItemsOLD(log, cb) {
// // once we queue everything, we perform full scan
//
// let bucketCount = 0;
// const bucketInfos = [];
//
// this.db.listCollections().toArray((err, collInfos) => {
// // read infostore
// // async.mapLimit
// async.mapLimit(collInfos, 10, (value, next) => {
// if (value.name === METASTORE ||
// value.name === INFOSTORE ||
// value.name === USERSBUCKET ||
// value.name === PENSIEVE ||
// value.name.startsWith(constants.mpuBucketPrefix)
// ) {
// // skip
// return next();
// }
// bucketCount++;
// const bucketName = value.name;
// // FIXME: there is currently no way of distinguishing
// // master from versions and searching for VID_SEP
// // does not work because there cannot be null bytes
// // in $regex
//
// return this.getBucketAttributes(bucketName, log,
// (err, bucketInfo) => {
// if (err) {
// log.error('error occured in countItems', {
// method: 'countItems',
// error: err,
// });
// return next(errors.InternalError);
// }
// bucketInfos.push(bucketInfo);
// const retBucketInfo = {
// name: bucketName,
// location: bucketInfo.getLocationConstraint(),
// isVersioned:
// !!bucketInfo.getVersioningConfiguration(),
// ownerCanonicalId: bucketInfo.getOwner(),
// ingestion: bucketInfo.isIngestionBucket(),
// };
// return next(null, retBucketInfo);
// });
// }, (err, list) => {
// if (err) {
// return cb(err);
// }
//
// // remove undefined from list (i.e. METASTORE, USERSBUCKET)
// const retBucketInfoList = list.reduce((store, item) => {
// if (item !== undefined) {
// store.push(item);
// }
// return store;
// }, []);
//
// // TODO: check also this._scanInProgress
// const doFullScan = ((this.lastItemScanTime === null ||
// (Date.now() - this.lastItemScanTime) > itemScanRefreshDelay)
// && this.itemScanInProgress === false);
//
// return this.readCountItems(log, (err, result) => {
// if (err) {
// return cb(err);
// }
// // overwrite bucket info since we have latest
// /* eslint-disable */
// result.bucketList = retBucketInfoList;
// result.buckets = bucketCount;
// /* eslint-enable */
//
// if (doFullScan) {
// this._scanAllTheThings(retBucketInfoList, bucketCount,
// log);
// this.lastItemScanTime = Date.now();
// }
//
// console.log(results)
//
// return cb(null, result);
// });
// });
// });
// return undefined;
// }
_getBucketInfos(log, cb) {
let bucketCount = 0;
const bucketInfos = [];
console.log('start _getBucketInfos')
this.db.listCollections().toArray((err, collInfos) => {
if (err) {
log.error('could not get list of collections', {
method: '_getBucketInfos',
error: err,
});
return cb(err);
}
return async.eachLimit(collInfos, 10, (value, next) => {
if (value.name === METASTORE ||
value.name === INFOSTORE ||
value.name === USERSBUCKET ||
value.name === PENSIEVE ||
value.name.startsWith(constants.mpuBucketPrefix)
) {
// skip
return next();
}
bucketCount++;
const bucketName = value.name;
// FIXME: there is currently no way of distinguishing
// master from versions and searching for VID_SEP
// does not work because there cannot be null bytes
// in $regex
return this.getBucketAttributes(bucketName, log,
(err, bucketInfo) => {
if (err) {
log.error('error occured in countItems', {
method: '_getBucketInfos',
error: err,
});
return next(errors.InternalError);
}
bucketInfos.push(bucketInfo);
return next();
});
}, err => {
if (err) {
return cb(err);
}
return cb(null, {
bucketCount,
bucketInfos,
});
});
});
}
countItems(log, cb) {
const doFullScan = this.lastItemScanTime === null ||
(Date.now() - this.lastItemScanTime) > itemScanRefreshDelay;
console.log('start count items')
this._getBucketInfos(log, (err, res) => {
console.log(err)
console.log(res)
if (err) {
log.error('error getting bucket info', {
method: 'countItems',
error: err,
});
return cb(err);
}
const { bucketCount, bucketInfos } = res;
const retBucketInfos = bucketInfos.map(bucket => ({
name: bucket.getName(),
location: bucket.getLocationConstraint(),
isVersioned: !!bucket.getVersioningConfiguration(),
ownerCanonicalId: bucket.getOwner(),
ingestion: bucket.isIngestionBucket(),
}));
// const retBucketInfo = {
// name: bucketName,
// location: bucketInfo.getLocationConstraint(),
// isVersioned:
// !!bucketInfo.getVersioningConfiguration(),
// ownerCanonicalId: bucketInfo.getOwner(),
// ingestion: bucketInfo.isIngestionBucket(),
// };
// bucketList.push(retBucketInfo);
return this.readCountItems(log, (err, results) => {
console.log('=====')
console.log(err)
console.log(results)
if (err) {
return cb(err);
}
// overwrite bucket info since we have latest
/* eslint-disable */
results.bucketList = retBucketInfos;
results.buckets = bucketCount;
/* eslint-enable */
console.log(results);
return cb(null, results);
});
});
// let bucketCount = 0;
// const bucketList = [];
//
// this.db.listCollections().toArray((err, collInfos) => {
// async.eachLimit(collInfos, 10, (value, next) => {
// if (value.name === METASTORE ||
// value.name === INFOSTORE ||
// value.name === USERSBUCKET ||
// value.name === PENSIEVE ||
// value.name.startsWith(constants.mpuBucketPrefix)
// ) {
// // skip
// return next();
// }
// bucketCount++;
// const bucketName = value.name;
// // FIXME: there is currently no way of distinguishing
// // master from versions and searching for VID_SEP
// // does not work because there cannot be null bytes
// // in $regex
//
// return this.getBucketAttributes(bucketName, log,
// (err, bucketInfo) => {
// if (err) {
// log.error('error occured in countItems', {
// method: 'countItems',
// error: err,
// });
// return next(errors.InternalError);
// }
// const retBucketInfo = {
// name: bucketName,
// location: bucketInfo.getLocationConstraint(),
// isVersioned:
// !!bucketInfo.getVersioningConfiguration(),
// ownerCanonicalId: bucketInfo.getOwner(),
// ingestion: bucketInfo.isIngestionBucket(),
// };
// bucketList.push(retBucketInfo);
// return next(null, retBucketInfo);
// });
// }, err => {
// if (err) {
// return cb(err);
// }
//
// return this.readCountItems(log, (err, results) => {
// if (err) {
// return cb(err);
// }
// // overwrite bucket info since we have latest
// /* eslint-disable */
// results.bucketList = bucketList;
// results.buckets = bucketCount;
// /* eslint-enable */
//
// console.log(results);
//
// return cb(null, results);
// });
// });
// });
// return undefined;
}
scanItemCount(log, cb) {
if (this.itemScanInProgress) {
log.debug('scan is already in progress');
return cb(errors.OperationAborted);
}
this.itemScanInProgress = true;
const res = {
objects: 0,
@ -1092,8 +1484,8 @@ class MongoClientInterface {
next => this.getBucketAttributes(
bucketName, log, (err, bucketInfo) => {
if (err) {
log.error('error occured in countItems', {
method: 'countItems',
log.error('error getting bucket attributes', {
method: 'scanItemCount',
error: err,
});
return next(errors.InternalError);
@ -1109,13 +1501,15 @@ class MongoClientInterface {
res.bucketList.push(retBucketInfo);
return next(null, bucketInfo);
}),
(bucketInfo, next) => {
if (!doFullScan) {
return next(null, {});
(bucketInfo, next) => this._getIsTransient(bucketInfo, log,
(err, isTransient) => {
if (err) {
return next(err);
}
return this.getObjectMDStats(
bucketName, bucketInfo, log, next);
},
return next(null, bucketInfo, isTransient);
}),
(bucketInfo, isTransient, next) => this.getObjectMDStats(
bucketName, bucketInfo, isTransient, log, next),
], (err, results) => {
if (err) {
return next(errors.InternalError);
@ -1133,21 +1527,48 @@ class MongoClientInterface {
return cb(err);
}
if (!doFullScan) {
const cachedRes = this.dataCount.results();
cachedRes.bucketList = res.bucketList;
cachedRes.buckets = res.buckets;
return cb(null, cachedRes);
// save to infostore
return this.updateCountItems(res, log, err => {
this.itemScanInProgress = false;
if (err) {
log.error('error saving count items in mongo', {
method: 'scanItemCount',
error: err,
});
return cb(err);
}
this.lastItemScanTime = Date.now();
this.dataCount.set(res);
return cb(null, res);
});
});
});
return undefined;
}
_getIsTransient(bucketInfo, log, cb) {
const overlayVersionId = 'configuration/overlay-version';
async.waterfall([
next => this.getObject(PENSIEVE, overlayVersionId, {}, log, next),
(version, next) => {
const overlayConfigId = `configuration/overlay/${version}`;
return this.getObject(PENSIEVE, overlayConfigId, {}, log, next);
},
], (err, res) => {
if (err) {
log.error('error getting configuration overlay', {
method: '_getIsTransient',
error: err,
});
return cb(err);
}
const locConstraint = bucketInfo.getLocationConstraint();
const isTransient =
Boolean(res.locations[locConstraint].isTransient);
return cb(null, isTransient);
});
}
_getLocName(loc) {
return loc === 'mem' || loc === 'file' ? 'us-east-1' : loc;
}
@ -1367,14 +1788,14 @@ class MongoClientInterface {
});
}
getObjectMDStats(bucketName, bucketInfo, log, callback) {
getObjectMDStats(bucketName, bucketInfo, isTransient, log, callback) {
const c = this.getCollection(bucketName);
let isTransient;
if (this.config) {
isTransient = this.config
.getLocationConstraint(bucketInfo.getLocationConstraint())
.isTransient;
}
// TODO: base off PENSIEVE
// if (this.config) {
// isTransient = this.config
// .getLocationConstraint(bucketInfo.getLocationConstraint())
// .isTransient;
// }
const mstFilter = {
'_id': { $regex: /^[^\0]+$/ },
'value.versionId': { $exists: true },