Compare commits

...

15 Commits

Author SHA1 Message Date
williamlardier df7d30c164
wip 2023-02-16 14:47:55 +01:00
williamlardier ad19118d2c
wip 2023-02-16 14:44:42 +01:00
williamlardier dbd6f439db
wip 2023-02-16 14:29:09 +01:00
williamlardier b2b133b824
wip 2023-02-09 14:38:04 +01:00
williamlardier 9ee0da7084
wip 2023-02-09 14:23:22 +01:00
williamlardier 61f26ffe02
testing 2023-02-08 10:08:00 +01:00
williamlardier 952fe9197a
testing 2023-02-08 09:51:25 +01:00
williamlardier 5ba1cd697b
testing 2023-02-08 09:39:32 +01:00
williamlardier 497c76dfd7
testing 2023-02-08 08:25:50 +01:00
williamlardier 3595e7ad7b
wip 2023-02-02 08:10:51 +01:00
williamlardier a02b874942
wip 2023-02-02 08:03:41 +01:00
williamlardier 48ae3f966e
wip 2023-02-02 07:57:30 +01:00
williamlardier 509fbc8bf8
test 2023-02-02 05:58:49 +01:00
williamlardier 4c3e4108a5
test3 2023-02-02 05:58:49 +01:00
williamlardier 206f4f412d
test2 2023-02-02 05:58:46 +01:00
3 changed files with 199 additions and 68 deletions

View File

@ -279,10 +279,11 @@ class MultipleBackendGateway {
} }
objectTagging(method, key, bucket, objectMD, log, cb) { objectTagging(method, key, bucket, objectMD, log, cb) {
console.log('>> objectTagging', method, key, bucket, objectMD);
// if legacy, objectMD will not contain dataStoreName, so just return // if legacy, objectMD will not contain dataStoreName, so just return
const client = this.clients[objectMD.dataStoreName]; const client = this.clients[objectMD.dataStoreName];
if (client && client[`object${method}Tagging`]) { if (client && client[`object${method}Tagging`]) {
return client[`object${method}Tagging`](key, bucket, objectMD, log, return client[`object${method}Tagging`](key, bucket.getName(), objectMD, log,
cb); cb);
} }
return cb(); return cb();

View File

@ -69,6 +69,7 @@ class AwsClient {
_createAwsKey(requestBucketName, requestObjectKey, _createAwsKey(requestBucketName, requestObjectKey,
bucketMatch) { bucketMatch) {
console.log('===', requestBucketName, requestObjectKey, bucketMatch);
if (bucketMatch) { if (bucketMatch) {
return requestObjectKey; return requestObjectKey;
} }
@ -489,14 +490,17 @@ class AwsClient {
} }
objectPutTagging(key, bucketName, objectMD, log, callback) { objectPutTagging(key, bucketName, objectMD, log, callback) {
console.log('0 >>', this._client.config, this._bucketMatch, '--->', this._createAwsKey(bucketName, key, this._bucketMatch));
const awsBucket = this._awsBucketName; const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch); const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId; const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
console.log('1 >>', JSON.stringify(objectMD), key, awsKey);
const tagParams = { const tagParams = {
Bucket: awsBucket, Bucket: awsBucket,
Key: awsKey, Key: awsKey,
VersionId: dataStoreVersionId, VersionId: dataStoreVersionId,
}; };
console.log('2 >>', tagParams);
const keyArray = Object.keys(objectMD.tags); const keyArray = Object.keys(objectMD.tags);
tagParams.Tagging = {}; tagParams.Tagging = {};
tagParams.Tagging.TagSet = keyArray.map(key => { tagParams.Tagging.TagSet = keyArray.map(key => {
@ -505,6 +509,7 @@ class AwsClient {
}); });
return this._client.putObjectTagging(tagParams, err => { return this._client.putObjectTagging(tagParams, err => {
if (err) { if (err) {
console.log('ERROR!! >>', err);
logHelper(log, 'error', 'error from data backend on ' + logHelper(log, 'error', 'error from data backend on ' +
'putObjectTagging', err, 'putObjectTagging', err,
this._dataStoreName, this.clientType); this._dataStoreName, this.clientType);

View File

@ -144,10 +144,19 @@ class MongoClientInterface {
socketTimeoutMS, socketTimeoutMS,
useNewUrlParser: true, useNewUrlParser: true,
}; };
if (process.env.MONGO_POOL_SIZE &&
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
options.minPoolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
}
if (process.env.MONGO_POOL_SIZE && if (process.env.MONGO_POOL_SIZE &&
!Number.isNaN(process.env.MONGO_POOL_SIZE)) { !Number.isNaN(process.env.MONGO_POOL_SIZE)) {
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10); options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
} }
if (process.env.MONGO_MAX_POOL_SIZE &&
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
}
this.bulked = process.env.BULK_FREQ;
return MongoClient.connect(this.mongoUrl, options, (err, client) => { return MongoClient.connect(this.mongoUrl, options, (err, client) => {
if (err) { if (err) {
this.logger.error('error connecting to mongodb', this.logger.error('error connecting to mongodb',
@ -170,10 +179,77 @@ class MongoClientInterface {
this.cacheMiss = 0; this.cacheMiss = 0;
}, 300000); }, 300000);
this.bulks = {};
this.bulkUpdates();
return this.usersBucketHack(cb); return this.usersBucketHack(cb);
}); });
} }
bulkUpdates() {
if (!this.bulked) {
return;
}
// Every BULK_FREQ || 10ms, bulk write everything we have
setInterval(() => {
console.log('INTERVAL -- ', this.bulks);
Object.keys(this.bulks).forEach(async key => {
const c = this.bulks[key];
if (c.operations.length === 0) {
console.log(' > NO ', key);
} else {
let globalOps = [];
let callbacks = [];
c.operations.forEach(op => {
globalOps = globalOps.concat(op.list);
callbacks = callbacks.concat(op.onDone);
});
// Empty the arrays
c.operations = [];
console.log(' > ', key, 'opnb', globalOps.length, 'cbnb', callbacks.length);
await c.client.bulkWrite(globalOps, {
ordered: true,
});
callbacks.forEach(cb => cb());
}
});
}, Number(process.env.BULK_FREQ) || 10);
}
/**
* Inits the bulk object
* @param {string} bucketName - the name of the bucket/collection
* @param {Object} c - collection db object
*/
initBulk(bucketName, c) {
if (!this.bulks[bucketName]) {
this.bulks[bucketName] = {
client: c,
operations: [],
};
}
}
/**
* Add an array of operations to the bulker for the current bucket
*
* @param {string} bucketName - name of the bucket
* @param {Array} operations - list of operations
* @param {callback} onDone - callback once operations are done
*/
addToBucketBulk(bucketName, operations, onDone) {
/*
Example of operations
[{operations:[...], onDone}, {operations:[...], onDone}, {operations:[...], onDone}]
*/
this.bulks[bucketName].operations = this.bulks[bucketName].operations.concat({
list: operations,
onDone,
});
}
usersBucketHack(cb) { usersBucketHack(cb) {
/* FIXME: Since the bucket creation API is expecting the /* FIXME: Since the bucket creation API is expecting the
usersBucket to have attributes, we pre-create the usersBucket to have attributes, we pre-create the
@ -299,6 +375,11 @@ class MongoClientInterface {
* @return {undefined} * @return {undefined}
*/ */
getBucketAttributes(bucketName, log, cb) { getBucketAttributes(bucketName, log, cb) {
// Return the cached bucket if exists
const cachedBucket = this.bucketVFormatCache.get(bucketName+'bkt');
if (cachedBucket) {
return cb(null, cachedBucket);
}
const m = this.getCollection(METASTORE); const m = this.getCollection(METASTORE);
m.findOne({ m.findOne({
_id: bucketName, _id: bucketName,
@ -316,6 +397,8 @@ class MongoClientInterface {
// that properly inits w/o JSON.parse() // that properly inits w/o JSON.parse()
const bucketMDStr = JSON.stringify(doc.value); const bucketMDStr = JSON.stringify(doc.value);
const bucketMD = BucketInfo.deSerialize(bucketMDStr); const bucketMD = BucketInfo.deSerialize(bucketMDStr);
// Cache the result
this.bucketVFormatCache.add(bucketName+'bkt', bucketMD);
return cb(null, bucketMD); return cb(null, bucketMD);
}); });
return undefined; return undefined;
@ -531,45 +614,7 @@ class MongoClientInterface {
objVal.versionId = versionId; objVal.versionId = versionId;
const versionKey = formatVersionKey(objName, versionId, params.vFormat); const versionKey = formatVersionKey(objName, versionId, params.vFormat);
const masterKey = formatMasterKey(objName, params.vFormat); const masterKey = formatMasterKey(objName, params.vFormat);
// initiating array of operations with version creation const onDone = err => {
const ops = [{
updateOne: {
filter: {
_id: versionKey,
},
update: {
$set: { _id: versionKey, value: objVal },
},
upsert: true,
},
}];
// filter to get master
const filter = {
_id: masterKey,
$or: [{
'value.versionId': {
$exists: false,
},
},
{
'value.versionId': {
$gt: objVal.versionId,
},
},
],
};
// values to update master
const update = {
$set: { _id: masterKey, value: objVal },
};
// updating or deleting master depending on the last version put
// in v0 the master gets updated, in v1 the master gets deleted if version is
// a delete marker or updated otherwise.
const masterOp = this.updateDeleteMaster(objVal.isDeleteMarker, params.vFormat, filter, update, true);
ops.push(masterOp);
c.bulkWrite(ops, {
ordered: true,
}, err => {
/* /*
* Related to https://jira.mongodb.org/browse/SERVER-14322 * Related to https://jira.mongodb.org/browse/SERVER-14322
* It happens when we are pushing two versions "at the same time" * It happens when we are pushing two versions "at the same time"
@ -613,7 +658,51 @@ class MongoClientInterface {
} }
} }
return cb(null, `{"versionId": "${versionId}"}`); return cb(null, `{"versionId": "${versionId}"}`);
}); };
// initiating array of operations with version creation
const ops = [{
updateOne: {
filter: {
_id: versionKey,
},
update: {
$set: { _id: versionKey, value: objVal },
},
upsert: true,
},
}];
// filter to get master
const filter = {
_id: masterKey,
$or: [{
'value.versionId': {
$exists: false,
},
},
{
'value.versionId': {
$gt: objVal.versionId,
},
},
],
};
// values to update master
const update = {
$set: { _id: masterKey, value: objVal },
};
// updating or deleting master depending on the last version put
// in v0 the master gets updated, in v1 the master gets deleted if version is
// a delete marker or updated otherwise.
const masterOp = this.updateDeleteMaster(objVal.isDeleteMarker, params.vFormat, filter, update, true);
ops.push(masterOp);
if (this.bulked) {
this.addToBucketBulk(bucketName, ops, onDone);
} else {
c.bulkWrite(ops, {
ordered: true,
}, onDone);
}
} }
/** /**
@ -634,14 +723,7 @@ class MongoClientInterface {
// eslint-disable-next-line // eslint-disable-next-line
objVal.versionId = versionId; objVal.versionId = versionId;
const masterKey = formatMasterKey(objName, params.vFormat); const masterKey = formatMasterKey(objName, params.vFormat);
c.update({ const onDone = err => {
_id: masterKey,
}, {
_id: masterKey,
value: objVal,
}, {
upsert: true,
}, err => {
if (err) { if (err) {
log.error( log.error(
'putObjectVerCase2: error putting object version', 'putObjectVerCase2: error putting object version',
@ -649,7 +731,25 @@ class MongoClientInterface {
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null, `{"versionId": "${objVal.versionId}"}`); return cb(null, `{"versionId": "${objVal.versionId}"}`);
}); };
if (this.bulked) {
this.addToBucketBulk(bucketName, [{
updateOne: {
filter: { _id: masterKey },
update: { $set: { _id: masterKey, value: objVal } },
upsert: true,
}
}], onDone);
} else {
c.update({
_id: masterKey,
}, {
_id: masterKey,
value: objVal,
}, {
upsert: true,
}, onDone);
}
} }
/** /**
@ -785,14 +885,7 @@ class MongoClientInterface {
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) { putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat); const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
const masterKey = formatMasterKey(objName, params.vFormat); const masterKey = formatMasterKey(objName, params.vFormat);
c.update({ const onDone = err => {
_id: versionKey,
}, {
_id: versionKey,
value: objVal,
}, {
upsert: true,
}, err => {
if (err) { if (err) {
log.error( log.error(
'putObjectVerCase4: error upserting object version', 'putObjectVerCase4: error upserting object version',
@ -858,7 +951,25 @@ class MongoClientInterface {
return undefined; return undefined;
}); });
return undefined; return undefined;
}); };
if (this.bulked) {
this.addToBucketBulk(bucketName, [{
updateOne: {
filter: { _id: versionKey },
update: { $set: { _id: versionKey, value: objVal } },
upsert: true,
}
}], onDone);
} else {
c.update({
_id: versionKey,
}, {
_id: versionKey,
value: objVal,
}, {
upsert: true,
}, onDone);
}
} }
/** /**
@ -874,14 +985,7 @@ class MongoClientInterface {
*/ */
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) { putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
const masterKey = formatMasterKey(objName, params.vFormat); const masterKey = formatMasterKey(objName, params.vFormat);
c.update({ const onDone = err => {
_id: masterKey,
}, {
_id: masterKey,
value: objVal,
}, {
upsert: true,
}, err => {
if (err) { if (err) {
log.error( log.error(
'putObjectNoVer: error putting obect with no versioning', 'putObjectNoVer: error putting obect with no versioning',
@ -889,7 +993,26 @@ class MongoClientInterface {
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(); return cb();
}); };
if (this.bulked) {
this.addToBucketBulk(bucketName, [{
updateOne: {
filter: { _id: masterKey },
update: { $set: { _id: masterKey, value: objVal } },
upsert: true,
}
}], onDone);
} else {
c.update({
_id: masterKey,
}, {
_id: masterKey,
value: objVal,
}, {
upsert: true,
}, onDone);
}
} }
/** /**
@ -925,6 +1048,8 @@ class MongoClientInterface {
putObject(bucketName, objName, objVal, params, log, cb) { putObject(bucketName, objName, objVal, params, log, cb) {
MongoUtils.serialize(objVal); MongoUtils.serialize(objVal);
const c = this.getCollection(bucketName); const c = this.getCollection(bucketName);
// Create if needed the bulk object
this.initBulk(bucketName, c);
const _params = Object.assign({}, params); const _params = Object.assign({}, params);
return this.getBucketVFormat(bucketName, log, (err, vFormat) => { return this.getBucketVFormat(bucketName, log, (err, vFormat) => {
if (err) { if (err) {