Compare commits
15 Commits
developmen
...
task/arsen
Author | SHA1 | Date |
---|---|---|
williamlardier | df7d30c164 | |
williamlardier | ad19118d2c | |
williamlardier | dbd6f439db | |
williamlardier | b2b133b824 | |
williamlardier | 9ee0da7084 | |
williamlardier | 61f26ffe02 | |
williamlardier | 952fe9197a | |
williamlardier | 5ba1cd697b | |
williamlardier | 497c76dfd7 | |
williamlardier | 3595e7ad7b | |
williamlardier | a02b874942 | |
williamlardier | 48ae3f966e | |
williamlardier | 509fbc8bf8 | |
williamlardier | 4c3e4108a5 | |
williamlardier | 206f4f412d |
|
@ -279,10 +279,11 @@ class MultipleBackendGateway {
|
||||||
}
|
}
|
||||||
|
|
||||||
objectTagging(method, key, bucket, objectMD, log, cb) {
|
objectTagging(method, key, bucket, objectMD, log, cb) {
|
||||||
|
console.log('>> objectTagging', method, key, bucket, objectMD);
|
||||||
// if legacy, objectMD will not contain dataStoreName, so just return
|
// if legacy, objectMD will not contain dataStoreName, so just return
|
||||||
const client = this.clients[objectMD.dataStoreName];
|
const client = this.clients[objectMD.dataStoreName];
|
||||||
if (client && client[`object${method}Tagging`]) {
|
if (client && client[`object${method}Tagging`]) {
|
||||||
return client[`object${method}Tagging`](key, bucket, objectMD, log,
|
return client[`object${method}Tagging`](key, bucket.getName(), objectMD, log,
|
||||||
cb);
|
cb);
|
||||||
}
|
}
|
||||||
return cb();
|
return cb();
|
||||||
|
|
|
@ -69,6 +69,7 @@ class AwsClient {
|
||||||
|
|
||||||
_createAwsKey(requestBucketName, requestObjectKey,
|
_createAwsKey(requestBucketName, requestObjectKey,
|
||||||
bucketMatch) {
|
bucketMatch) {
|
||||||
|
console.log('===', requestBucketName, requestObjectKey, bucketMatch);
|
||||||
if (bucketMatch) {
|
if (bucketMatch) {
|
||||||
return requestObjectKey;
|
return requestObjectKey;
|
||||||
}
|
}
|
||||||
|
@ -489,14 +490,17 @@ class AwsClient {
|
||||||
}
|
}
|
||||||
|
|
||||||
objectPutTagging(key, bucketName, objectMD, log, callback) {
|
objectPutTagging(key, bucketName, objectMD, log, callback) {
|
||||||
|
console.log('0 >>', this._client.config, this._bucketMatch, '--->', this._createAwsKey(bucketName, key, this._bucketMatch));
|
||||||
const awsBucket = this._awsBucketName;
|
const awsBucket = this._awsBucketName;
|
||||||
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
|
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
|
||||||
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
|
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
|
||||||
|
console.log('1 >>', JSON.stringify(objectMD), key, awsKey);
|
||||||
const tagParams = {
|
const tagParams = {
|
||||||
Bucket: awsBucket,
|
Bucket: awsBucket,
|
||||||
Key: awsKey,
|
Key: awsKey,
|
||||||
VersionId: dataStoreVersionId,
|
VersionId: dataStoreVersionId,
|
||||||
};
|
};
|
||||||
|
console.log('2 >>', tagParams);
|
||||||
const keyArray = Object.keys(objectMD.tags);
|
const keyArray = Object.keys(objectMD.tags);
|
||||||
tagParams.Tagging = {};
|
tagParams.Tagging = {};
|
||||||
tagParams.Tagging.TagSet = keyArray.map(key => {
|
tagParams.Tagging.TagSet = keyArray.map(key => {
|
||||||
|
@ -505,6 +509,7 @@ class AwsClient {
|
||||||
});
|
});
|
||||||
return this._client.putObjectTagging(tagParams, err => {
|
return this._client.putObjectTagging(tagParams, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
console.log('ERROR!! >>', err);
|
||||||
logHelper(log, 'error', 'error from data backend on ' +
|
logHelper(log, 'error', 'error from data backend on ' +
|
||||||
'putObjectTagging', err,
|
'putObjectTagging', err,
|
||||||
this._dataStoreName, this.clientType);
|
this._dataStoreName, this.clientType);
|
||||||
|
|
|
@ -144,10 +144,19 @@ class MongoClientInterface {
|
||||||
socketTimeoutMS,
|
socketTimeoutMS,
|
||||||
useNewUrlParser: true,
|
useNewUrlParser: true,
|
||||||
};
|
};
|
||||||
|
if (process.env.MONGO_POOL_SIZE &&
|
||||||
|
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||||
|
options.minPoolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||||
|
}
|
||||||
if (process.env.MONGO_POOL_SIZE &&
|
if (process.env.MONGO_POOL_SIZE &&
|
||||||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||||
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||||
}
|
}
|
||||||
|
if (process.env.MONGO_MAX_POOL_SIZE &&
|
||||||
|
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
|
||||||
|
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
|
||||||
|
}
|
||||||
|
this.bulked = process.env.BULK_FREQ;
|
||||||
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
this.logger.error('error connecting to mongodb',
|
this.logger.error('error connecting to mongodb',
|
||||||
|
@ -170,10 +179,77 @@ class MongoClientInterface {
|
||||||
this.cacheMiss = 0;
|
this.cacheMiss = 0;
|
||||||
}, 300000);
|
}, 300000);
|
||||||
|
|
||||||
|
this.bulks = {};
|
||||||
|
this.bulkUpdates();
|
||||||
|
|
||||||
return this.usersBucketHack(cb);
|
return this.usersBucketHack(cb);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bulkUpdates() {
|
||||||
|
if (!this.bulked) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Every BULK_FREQ || 10ms, bulk write everything we have
|
||||||
|
setInterval(() => {
|
||||||
|
|
||||||
|
console.log('INTERVAL -- ', this.bulks);
|
||||||
|
Object.keys(this.bulks).forEach(async key => {
|
||||||
|
const c = this.bulks[key];
|
||||||
|
if (c.operations.length === 0) {
|
||||||
|
console.log(' > NO ', key);
|
||||||
|
} else {
|
||||||
|
let globalOps = [];
|
||||||
|
let callbacks = [];
|
||||||
|
c.operations.forEach(op => {
|
||||||
|
globalOps = globalOps.concat(op.list);
|
||||||
|
callbacks = callbacks.concat(op.onDone);
|
||||||
|
});
|
||||||
|
// Empty the arrays
|
||||||
|
c.operations = [];
|
||||||
|
console.log(' > ', key, 'opnb', globalOps.length, 'cbnb', callbacks.length);
|
||||||
|
await c.client.bulkWrite(globalOps, {
|
||||||
|
ordered: true,
|
||||||
|
});
|
||||||
|
callbacks.forEach(cb => cb());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}, Number(process.env.BULK_FREQ) || 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Inits the bulk object
|
||||||
|
* @param {string} bucketName - the name of the bucket/collection
|
||||||
|
* @param {Object} c - collection db object
|
||||||
|
*/
|
||||||
|
initBulk(bucketName, c) {
|
||||||
|
if (!this.bulks[bucketName]) {
|
||||||
|
this.bulks[bucketName] = {
|
||||||
|
client: c,
|
||||||
|
operations: [],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add an array of operations to the bulker for the current bucket
|
||||||
|
*
|
||||||
|
* @param {string} bucketName - name of the bucket
|
||||||
|
* @param {Array} operations - list of operations
|
||||||
|
* @param {callback} onDone - callback once operations are done
|
||||||
|
*/
|
||||||
|
addToBucketBulk(bucketName, operations, onDone) {
|
||||||
|
/*
|
||||||
|
Example of operations
|
||||||
|
[{operations:[...], onDone}, {operations:[...], onDone}, {operations:[...], onDone}]
|
||||||
|
*/
|
||||||
|
this.bulks[bucketName].operations = this.bulks[bucketName].operations.concat({
|
||||||
|
list: operations,
|
||||||
|
onDone,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
usersBucketHack(cb) {
|
usersBucketHack(cb) {
|
||||||
/* FIXME: Since the bucket creation API is expecting the
|
/* FIXME: Since the bucket creation API is expecting the
|
||||||
usersBucket to have attributes, we pre-create the
|
usersBucket to have attributes, we pre-create the
|
||||||
|
@ -299,6 +375,11 @@ class MongoClientInterface {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
getBucketAttributes(bucketName, log, cb) {
|
getBucketAttributes(bucketName, log, cb) {
|
||||||
|
// Return the cached bucket if exists
|
||||||
|
const cachedBucket = this.bucketVFormatCache.get(bucketName+'bkt');
|
||||||
|
if (cachedBucket) {
|
||||||
|
return cb(null, cachedBucket);
|
||||||
|
}
|
||||||
const m = this.getCollection(METASTORE);
|
const m = this.getCollection(METASTORE);
|
||||||
m.findOne({
|
m.findOne({
|
||||||
_id: bucketName,
|
_id: bucketName,
|
||||||
|
@ -316,6 +397,8 @@ class MongoClientInterface {
|
||||||
// that properly inits w/o JSON.parse()
|
// that properly inits w/o JSON.parse()
|
||||||
const bucketMDStr = JSON.stringify(doc.value);
|
const bucketMDStr = JSON.stringify(doc.value);
|
||||||
const bucketMD = BucketInfo.deSerialize(bucketMDStr);
|
const bucketMD = BucketInfo.deSerialize(bucketMDStr);
|
||||||
|
// Cache the result
|
||||||
|
this.bucketVFormatCache.add(bucketName+'bkt', bucketMD);
|
||||||
return cb(null, bucketMD);
|
return cb(null, bucketMD);
|
||||||
});
|
});
|
||||||
return undefined;
|
return undefined;
|
||||||
|
@ -531,45 +614,7 @@ class MongoClientInterface {
|
||||||
objVal.versionId = versionId;
|
objVal.versionId = versionId;
|
||||||
const versionKey = formatVersionKey(objName, versionId, params.vFormat);
|
const versionKey = formatVersionKey(objName, versionId, params.vFormat);
|
||||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||||
// initiating array of operations with version creation
|
const onDone = err => {
|
||||||
const ops = [{
|
|
||||||
updateOne: {
|
|
||||||
filter: {
|
|
||||||
_id: versionKey,
|
|
||||||
},
|
|
||||||
update: {
|
|
||||||
$set: { _id: versionKey, value: objVal },
|
|
||||||
},
|
|
||||||
upsert: true,
|
|
||||||
},
|
|
||||||
}];
|
|
||||||
// filter to get master
|
|
||||||
const filter = {
|
|
||||||
_id: masterKey,
|
|
||||||
$or: [{
|
|
||||||
'value.versionId': {
|
|
||||||
$exists: false,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
'value.versionId': {
|
|
||||||
$gt: objVal.versionId,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
|
||||||
// values to update master
|
|
||||||
const update = {
|
|
||||||
$set: { _id: masterKey, value: objVal },
|
|
||||||
};
|
|
||||||
// updating or deleting master depending on the last version put
|
|
||||||
// in v0 the master gets updated, in v1 the master gets deleted if version is
|
|
||||||
// a delete marker or updated otherwise.
|
|
||||||
const masterOp = this.updateDeleteMaster(objVal.isDeleteMarker, params.vFormat, filter, update, true);
|
|
||||||
ops.push(masterOp);
|
|
||||||
c.bulkWrite(ops, {
|
|
||||||
ordered: true,
|
|
||||||
}, err => {
|
|
||||||
/*
|
/*
|
||||||
* Related to https://jira.mongodb.org/browse/SERVER-14322
|
* Related to https://jira.mongodb.org/browse/SERVER-14322
|
||||||
* It happens when we are pushing two versions "at the same time"
|
* It happens when we are pushing two versions "at the same time"
|
||||||
|
@ -613,7 +658,51 @@ class MongoClientInterface {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return cb(null, `{"versionId": "${versionId}"}`);
|
return cb(null, `{"versionId": "${versionId}"}`);
|
||||||
});
|
};
|
||||||
|
|
||||||
|
// initiating array of operations with version creation
|
||||||
|
const ops = [{
|
||||||
|
updateOne: {
|
||||||
|
filter: {
|
||||||
|
_id: versionKey,
|
||||||
|
},
|
||||||
|
update: {
|
||||||
|
$set: { _id: versionKey, value: objVal },
|
||||||
|
},
|
||||||
|
upsert: true,
|
||||||
|
},
|
||||||
|
}];
|
||||||
|
// filter to get master
|
||||||
|
const filter = {
|
||||||
|
_id: masterKey,
|
||||||
|
$or: [{
|
||||||
|
'value.versionId': {
|
||||||
|
$exists: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
'value.versionId': {
|
||||||
|
$gt: objVal.versionId,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
// values to update master
|
||||||
|
const update = {
|
||||||
|
$set: { _id: masterKey, value: objVal },
|
||||||
|
};
|
||||||
|
// updating or deleting master depending on the last version put
|
||||||
|
// in v0 the master gets updated, in v1 the master gets deleted if version is
|
||||||
|
// a delete marker or updated otherwise.
|
||||||
|
const masterOp = this.updateDeleteMaster(objVal.isDeleteMarker, params.vFormat, filter, update, true);
|
||||||
|
ops.push(masterOp);
|
||||||
|
if (this.bulked) {
|
||||||
|
this.addToBucketBulk(bucketName, ops, onDone);
|
||||||
|
} else {
|
||||||
|
c.bulkWrite(ops, {
|
||||||
|
ordered: true,
|
||||||
|
}, onDone);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -634,14 +723,7 @@ class MongoClientInterface {
|
||||||
// eslint-disable-next-line
|
// eslint-disable-next-line
|
||||||
objVal.versionId = versionId;
|
objVal.versionId = versionId;
|
||||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||||
c.update({
|
const onDone = err => {
|
||||||
_id: masterKey,
|
|
||||||
}, {
|
|
||||||
_id: masterKey,
|
|
||||||
value: objVal,
|
|
||||||
}, {
|
|
||||||
upsert: true,
|
|
||||||
}, err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
log.error(
|
||||||
'putObjectVerCase2: error putting object version',
|
'putObjectVerCase2: error putting object version',
|
||||||
|
@ -649,7 +731,25 @@ class MongoClientInterface {
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
||||||
});
|
};
|
||||||
|
if (this.bulked) {
|
||||||
|
this.addToBucketBulk(bucketName, [{
|
||||||
|
updateOne: {
|
||||||
|
filter: { _id: masterKey },
|
||||||
|
update: { $set: { _id: masterKey, value: objVal } },
|
||||||
|
upsert: true,
|
||||||
|
}
|
||||||
|
}], onDone);
|
||||||
|
} else {
|
||||||
|
c.update({
|
||||||
|
_id: masterKey,
|
||||||
|
}, {
|
||||||
|
_id: masterKey,
|
||||||
|
value: objVal,
|
||||||
|
}, {
|
||||||
|
upsert: true,
|
||||||
|
}, onDone);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -785,14 +885,7 @@ class MongoClientInterface {
|
||||||
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
|
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
|
||||||
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
|
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
|
||||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||||
c.update({
|
const onDone = err => {
|
||||||
_id: versionKey,
|
|
||||||
}, {
|
|
||||||
_id: versionKey,
|
|
||||||
value: objVal,
|
|
||||||
}, {
|
|
||||||
upsert: true,
|
|
||||||
}, err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
log.error(
|
||||||
'putObjectVerCase4: error upserting object version',
|
'putObjectVerCase4: error upserting object version',
|
||||||
|
@ -858,7 +951,25 @@ class MongoClientInterface {
|
||||||
return undefined;
|
return undefined;
|
||||||
});
|
});
|
||||||
return undefined;
|
return undefined;
|
||||||
});
|
};
|
||||||
|
if (this.bulked) {
|
||||||
|
this.addToBucketBulk(bucketName, [{
|
||||||
|
updateOne: {
|
||||||
|
filter: { _id: versionKey },
|
||||||
|
update: { $set: { _id: versionKey, value: objVal } },
|
||||||
|
upsert: true,
|
||||||
|
}
|
||||||
|
}], onDone);
|
||||||
|
} else {
|
||||||
|
c.update({
|
||||||
|
_id: versionKey,
|
||||||
|
}, {
|
||||||
|
_id: versionKey,
|
||||||
|
value: objVal,
|
||||||
|
}, {
|
||||||
|
upsert: true,
|
||||||
|
}, onDone);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -874,14 +985,7 @@ class MongoClientInterface {
|
||||||
*/
|
*/
|
||||||
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
|
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
|
||||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||||
c.update({
|
const onDone = err => {
|
||||||
_id: masterKey,
|
|
||||||
}, {
|
|
||||||
_id: masterKey,
|
|
||||||
value: objVal,
|
|
||||||
}, {
|
|
||||||
upsert: true,
|
|
||||||
}, err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error(
|
log.error(
|
||||||
'putObjectNoVer: error putting obect with no versioning',
|
'putObjectNoVer: error putting obect with no versioning',
|
||||||
|
@ -889,7 +993,26 @@ class MongoClientInterface {
|
||||||
return cb(errors.InternalError);
|
return cb(errors.InternalError);
|
||||||
}
|
}
|
||||||
return cb();
|
return cb();
|
||||||
});
|
};
|
||||||
|
|
||||||
|
if (this.bulked) {
|
||||||
|
this.addToBucketBulk(bucketName, [{
|
||||||
|
updateOne: {
|
||||||
|
filter: { _id: masterKey },
|
||||||
|
update: { $set: { _id: masterKey, value: objVal } },
|
||||||
|
upsert: true,
|
||||||
|
}
|
||||||
|
}], onDone);
|
||||||
|
} else {
|
||||||
|
c.update({
|
||||||
|
_id: masterKey,
|
||||||
|
}, {
|
||||||
|
_id: masterKey,
|
||||||
|
value: objVal,
|
||||||
|
}, {
|
||||||
|
upsert: true,
|
||||||
|
}, onDone);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -925,6 +1048,8 @@ class MongoClientInterface {
|
||||||
putObject(bucketName, objName, objVal, params, log, cb) {
|
putObject(bucketName, objName, objVal, params, log, cb) {
|
||||||
MongoUtils.serialize(objVal);
|
MongoUtils.serialize(objVal);
|
||||||
const c = this.getCollection(bucketName);
|
const c = this.getCollection(bucketName);
|
||||||
|
// Create if needed the bulk object
|
||||||
|
this.initBulk(bucketName, c);
|
||||||
const _params = Object.assign({}, params);
|
const _params = Object.assign({}, params);
|
||||||
return this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
return this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
|
Loading…
Reference in New Issue