Compare commits
1 Commits
developmen
...
ft/dataMan
Author | SHA1 | Date |
---|---|---|
Alexander Chan | 2c211b3d52 |
|
@ -0,0 +1,96 @@
|
||||||
|
function deepCopyObject(obj) {
|
||||||
|
return JSON.parse(JSON.stringify(obj));
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
To-do: parallel safety?
|
||||||
|
*/
|
||||||
|
class DataCounter {
|
||||||
|
constructor() {
|
||||||
|
this.objects = 0;
|
||||||
|
this.versions = 0;
|
||||||
|
this.buckets = 0;
|
||||||
|
this.bucketList = [];
|
||||||
|
this.dataManaged = {
|
||||||
|
total: { curr: 0, prev: 0 },
|
||||||
|
byLocation: {},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
set(setVal) {
|
||||||
|
if (setVal) {
|
||||||
|
this.objects = setVal.objects;
|
||||||
|
this.versions = setVal.versions;
|
||||||
|
this.buckets = setVal.buckets;
|
||||||
|
this.bucketList = [...setVal.bucketList];
|
||||||
|
this.dataManaged = deepCopyObject(setVal.dataManaged);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
addObject(objVal, preVal, isVersioned) {
|
||||||
|
// types of change
|
||||||
|
// new master, replace master, add to versioning case 1
|
||||||
|
// new master, replace master (suspended versioning) case 2
|
||||||
|
// new master, replace master (non-vesioning)
|
||||||
|
// new object, case 1
|
||||||
|
// new master, replace master (delete marker) case 1
|
||||||
|
if (preVal) {
|
||||||
|
if (isVersioned) {
|
||||||
|
++this.versions;
|
||||||
|
this.dataManaged.total.prev += preVal['content-length'];
|
||||||
|
}
|
||||||
|
this.dataManaged.total.curr -= preVal['content-length'];
|
||||||
|
preVal.locations.forEach(dataStoreName => {
|
||||||
|
if (this.dataManaged.byLocation[dataStoreName]) {
|
||||||
|
this.dataManaged.byLocation[dataStoreName].curr -=
|
||||||
|
preVal['content-length'];
|
||||||
|
if (isVersioned) {
|
||||||
|
this.dataManaged.byLocation[dataStoreName].prev +=
|
||||||
|
preVal['content-length'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
++this.objects;
|
||||||
|
}
|
||||||
|
if (!objVal.isDeleteMarker) {
|
||||||
|
this.dataManaged.total.curr += objVal['content-length'];
|
||||||
|
objVal.locations.forEach(dataStoreName => {
|
||||||
|
if (this.dataManaged.byLocation[dataStoreName]) {
|
||||||
|
this.dataManaged.byLocation[dataStoreName].curr +=
|
||||||
|
objVal['content-length'];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
delObject(objVal, isMaster) {
|
||||||
|
let type;
|
||||||
|
if (isMaster) {
|
||||||
|
--this.objects;
|
||||||
|
type = 'curr';
|
||||||
|
} else {
|
||||||
|
--this.versions;
|
||||||
|
type = 'prev';
|
||||||
|
}
|
||||||
|
this.dataManaged.total[type] -= objVal['content-length'];
|
||||||
|
objVal.locations.forEach(dataStoreName => {
|
||||||
|
if (this.dataManaged.byLocation[dataStoreName]) {
|
||||||
|
this.dataManaged.byLocation[dataStoreName][type] -=
|
||||||
|
objVal['content-length'];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
results() {
|
||||||
|
const obj = {
|
||||||
|
objects: this.objects,
|
||||||
|
versions: this.versions,
|
||||||
|
buckets: this.buckets,
|
||||||
|
bucketList: this.bucketList,
|
||||||
|
dataManaged: this.dataManaged,
|
||||||
|
};
|
||||||
|
return deepCopyObject(obj);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = DataCounter;
|
|
@ -23,6 +23,7 @@ const diskusage = require('diskusage');
|
||||||
const genVID = require('../../../versioning/VersionID').generateVersionId;
|
const genVID = require('../../../versioning/VersionID').generateVersionId;
|
||||||
const listAlgos = require('../../../algos/list/exportAlgos');
|
const listAlgos = require('../../../algos/list/exportAlgos');
|
||||||
|
|
||||||
|
const DataCounter = require('./DataCounter');
|
||||||
const MongoReadStream = require('./readStream');
|
const MongoReadStream = require('./readStream');
|
||||||
const MongoUtils = require('./utils');
|
const MongoUtils = require('./utils');
|
||||||
|
|
||||||
|
@ -94,7 +95,7 @@ class MongoClientInterface {
|
||||||
this.replicationGroupId = replicationGroupId;
|
this.replicationGroupId = replicationGroupId;
|
||||||
this.database = database;
|
this.database = database;
|
||||||
this.lastItemScanTime = null;
|
this.lastItemScanTime = null;
|
||||||
this.lastItemScanResult = null;
|
this.itemResult = new DataCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
setup(cb) {
|
setup(cb) {
|
||||||
|
@ -477,21 +478,59 @@ class MongoClientInterface {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
countWrapper(isVersioned, c, objName, objVal, log, cb) {
|
||||||
|
c.findOne({
|
||||||
|
_id: objName,
|
||||||
|
}, {}, (err, preVal) => {
|
||||||
|
if (err) {
|
||||||
|
log.error(err.message, {
|
||||||
|
error: err,
|
||||||
|
});
|
||||||
|
return cb(errors.InternalError);
|
||||||
|
}
|
||||||
|
if (preVal) {
|
||||||
|
this.itemResult.addObject(objVal, preVal.value, isVersioned);
|
||||||
|
} else {
|
||||||
|
this.itemResult.addObject(objVal, null, isVersioned);
|
||||||
|
}
|
||||||
|
return cb();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
putObject(bucketName, objName, objVal, params, log, cb) {
|
putObject(bucketName, objName, objVal, params, log, cb) {
|
||||||
MongoUtils.serialize(objVal);
|
MongoUtils.serialize(objVal);
|
||||||
const c = this.getCollection(bucketName);
|
const c = this.getCollection(bucketName);
|
||||||
if (params && params.versioning && !params.versionId) {
|
if (params && params.versioning && !params.versionId) {
|
||||||
|
// put new version, replace master
|
||||||
|
return this.countWrapper(true, c, objName, objVal, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
return this.putObjectVerCase1(c, bucketName, objName, objVal,
|
return this.putObjectVerCase1(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
|
});
|
||||||
} else if (params && params.versionId === '') {
|
} else if (params && params.versionId === '') {
|
||||||
|
// replace master (suspended versioning)
|
||||||
|
return this.countWrapper(false, c, objName, objVal, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
return this.putObjectVerCase2(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
|
});
|
||||||
} else if (params && params.versionId) {
|
} else if (params && params.versionId) {
|
||||||
|
// update versionId document
|
||||||
|
// nothing to change
|
||||||
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
return this.putObjectVerCase3(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
}
|
}
|
||||||
|
return this.countWrapper(false, c, objName, objVal, log, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
return this.putObjectNoVer(c, bucketName, objName, objVal,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
getObject(bucketName, objName, params, log, cb) {
|
getObject(bucketName, objName, params, log, cb) {
|
||||||
|
@ -741,9 +780,11 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
if (mst.value.isPHD ||
|
if (mst.value.isPHD ||
|
||||||
mst.value.versionId === params.versionId) {
|
mst.value.versionId === params.versionId) {
|
||||||
|
this.itemResult.delObject(mst.value, true);
|
||||||
return this.deleteObjectVerMaster(c, bucketName, objName,
|
return this.deleteObjectVerMaster(c, bucketName, objName,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
}
|
}
|
||||||
|
this.itemResult.delObject(mst.value, false);
|
||||||
return this.deleteObjectVerNotMaster(c, bucketName, objName,
|
return this.deleteObjectVerNotMaster(c, bucketName, objName,
|
||||||
params, log, cb);
|
params, log, cb);
|
||||||
});
|
});
|
||||||
|
@ -768,6 +809,7 @@ class MongoClientInterface {
|
||||||
{ error: err.message });
|
{ error: err.message });
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
|
this.itemResult.delObj(result.value, true);
|
||||||
return cb(null);
|
return cb(null);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -896,7 +938,7 @@ class MongoClientInterface {
|
||||||
countItems(log, cb) {
|
countItems(log, cb) {
|
||||||
if (this.lastItemScanTime !== null &&
|
if (this.lastItemScanTime !== null &&
|
||||||
(Date.now() - this.lastItemScanTime) <= itemScanRefreshDelay) {
|
(Date.now() - this.lastItemScanTime) <= itemScanRefreshDelay) {
|
||||||
return process.nextTick(cb, null, this.lastItemScanResult);
|
return process.nextTick(cb, null, this.itemResult.result());
|
||||||
}
|
}
|
||||||
|
|
||||||
const res = {
|
const res = {
|
||||||
|
@ -979,7 +1021,7 @@ class MongoClientInterface {
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
this.lastItemScanTime = Date.now();
|
this.lastItemScanTime = Date.now();
|
||||||
this.lastItemScanResult = res;
|
this.itemResult.set(res);
|
||||||
return cb(null, res);
|
return cb(null, res);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue