Compare commits

...

1 Commits

Author SHA1 Message Date
Alexander Chan 2c211b3d52 ft: data managed cached 2018-05-16 14:54:53 -07:00
3 changed files with 147 additions and 9 deletions

View File

@ -0,0 +1,96 @@
function deepCopyObject(obj) {
return JSON.parse(JSON.stringify(obj));
}
/*
To-do: parallel safety?
*/
class DataCounter {
constructor() {
this.objects = 0;
this.versions = 0;
this.buckets = 0;
this.bucketList = [];
this.dataManaged = {
total: { curr: 0, prev: 0 },
byLocation: {},
};
}
set(setVal) {
if (setVal) {
this.objects = setVal.objects;
this.versions = setVal.versions;
this.buckets = setVal.buckets;
this.bucketList = [...setVal.bucketList];
this.dataManaged = deepCopyObject(setVal.dataManaged);
}
}
addObject(objVal, preVal, isVersioned) {
// types of change
// new master, replace master, add to versioning case 1
// new master, replace master (suspended versioning) case 2
// new master, replace master (non-vesioning)
// new object, case 1
// new master, replace master (delete marker) case 1
if (preVal) {
if (isVersioned) {
++this.versions;
this.dataManaged.total.prev += preVal['content-length'];
}
this.dataManaged.total.curr -= preVal['content-length'];
preVal.locations.forEach(dataStoreName => {
if (this.dataManaged.byLocation[dataStoreName]) {
this.dataManaged.byLocation[dataStoreName].curr -=
preVal['content-length'];
if (isVersioned) {
this.dataManaged.byLocation[dataStoreName].prev +=
preVal['content-length'];
}
}
});
} else {
++this.objects;
}
if (!objVal.isDeleteMarker) {
this.dataManaged.total.curr += objVal['content-length'];
objVal.locations.forEach(dataStoreName => {
if (this.dataManaged.byLocation[dataStoreName]) {
this.dataManaged.byLocation[dataStoreName].curr +=
objVal['content-length'];
}
});
}
}
delObject(objVal, isMaster) {
let type;
if (isMaster) {
--this.objects;
type = 'curr';
} else {
--this.versions;
type = 'prev';
}
this.dataManaged.total[type] -= objVal['content-length'];
objVal.locations.forEach(dataStoreName => {
if (this.dataManaged.byLocation[dataStoreName]) {
this.dataManaged.byLocation[dataStoreName][type] -=
objVal['content-length'];
}
});
}
results() {
const obj = {
objects: this.objects,
versions: this.versions,
buckets: this.buckets,
bucketList: this.bucketList,
dataManaged: this.dataManaged,
};
return deepCopyObject(obj);
}
}
module.exports = DataCounter;

View File

@ -23,6 +23,7 @@ const diskusage = require('diskusage');
const genVID = require('../../../versioning/VersionID').generateVersionId;
const listAlgos = require('../../../algos/list/exportAlgos');
const DataCounter = require('./DataCounter');
const MongoReadStream = require('./readStream');
const MongoUtils = require('./utils');
@ -94,7 +95,7 @@ class MongoClientInterface {
this.replicationGroupId = replicationGroupId;
this.database = database;
this.lastItemScanTime = null;
this.lastItemScanResult = null;
this.itemResult = new DataCounter();
}
setup(cb) {
@ -477,21 +478,59 @@ class MongoClientInterface {
});
}
countWrapper(isVersioned, c, objName, objVal, log, cb) {
c.findOne({
_id: objName,
}, {}, (err, preVal) => {
if (err) {
log.error(err.message, {
error: err,
});
return cb(errors.InternalError);
}
if (preVal) {
this.itemResult.addObject(objVal, preVal.value, isVersioned);
} else {
this.itemResult.addObject(objVal, null, isVersioned);
}
return cb();
});
}
putObject(bucketName, objName, objVal, params, log, cb) {
MongoUtils.serialize(objVal);
const c = this.getCollection(bucketName);
if (params && params.versioning && !params.versionId) {
return this.putObjectVerCase1(c, bucketName, objName, objVal,
params, log, cb);
// put new version, replace master
return this.countWrapper(true, c, objName, objVal, log, err => {
if (err) {
return cb(err);
}
return this.putObjectVerCase1(c, bucketName, objName, objVal,
params, log, cb);
});
} else if (params && params.versionId === '') {
return this.putObjectVerCase2(c, bucketName, objName, objVal,
params, log, cb);
// replace master (suspended versioning)
return this.countWrapper(false, c, objName, objVal, log, err => {
if (err) {
return cb(err);
}
return this.putObjectVerCase2(c, bucketName, objName, objVal,
params, log, cb);
});
} else if (params && params.versionId) {
// update versionId document
// nothing to change
return this.putObjectVerCase3(c, bucketName, objName, objVal,
params, log, cb);
}
return this.putObjectNoVer(c, bucketName, objName, objVal,
params, log, cb);
return this.countWrapper(false, c, objName, objVal, log, err => {
if (err) {
return cb(err);
}
return this.putObjectNoVer(c, bucketName, objName, objVal,
params, log, cb);
});
}
getObject(bucketName, objName, params, log, cb) {
@ -741,9 +780,11 @@ class MongoClientInterface {
}
if (mst.value.isPHD ||
mst.value.versionId === params.versionId) {
this.itemResult.delObject(mst.value, true);
return this.deleteObjectVerMaster(c, bucketName, objName,
params, log, cb);
}
this.itemResult.delObject(mst.value, false);
return this.deleteObjectVerNotMaster(c, bucketName, objName,
params, log, cb);
});
@ -768,6 +809,7 @@ class MongoClientInterface {
{ error: err.message });
return cb(errors.InternalError);
}
this.itemResult.delObj(result.value, true);
return cb(null);
});
}
@ -896,7 +938,7 @@ class MongoClientInterface {
countItems(log, cb) {
if (this.lastItemScanTime !== null &&
(Date.now() - this.lastItemScanTime) <= itemScanRefreshDelay) {
return process.nextTick(cb, null, this.lastItemScanResult);
return process.nextTick(cb, null, this.itemResult.result());
}
const res = {
@ -979,7 +1021,7 @@ class MongoClientInterface {
return cb(err);
}
this.lastItemScanTime = Date.now();
this.lastItemScanResult = res;
this.itemResult.set(res);
return cb(null, res);
});
});

View File