Compare commits
15 Commits
developmen
...
task/arsen
Author | SHA1 | Date |
---|---|---|
williamlardier | df7d30c164 | |
williamlardier | ad19118d2c | |
williamlardier | dbd6f439db | |
williamlardier | b2b133b824 | |
williamlardier | 9ee0da7084 | |
williamlardier | 61f26ffe02 | |
williamlardier | 952fe9197a | |
williamlardier | 5ba1cd697b | |
williamlardier | 497c76dfd7 | |
williamlardier | 3595e7ad7b | |
williamlardier | a02b874942 | |
williamlardier | 48ae3f966e | |
williamlardier | 509fbc8bf8 | |
williamlardier | 4c3e4108a5 | |
williamlardier | 206f4f412d |
|
@ -279,10 +279,11 @@ class MultipleBackendGateway {
|
|||
}
|
||||
|
||||
objectTagging(method, key, bucket, objectMD, log, cb) {
|
||||
console.log('>> objectTagging', method, key, bucket, objectMD);
|
||||
// if legacy, objectMD will not contain dataStoreName, so just return
|
||||
const client = this.clients[objectMD.dataStoreName];
|
||||
if (client && client[`object${method}Tagging`]) {
|
||||
return client[`object${method}Tagging`](key, bucket, objectMD, log,
|
||||
return client[`object${method}Tagging`](key, bucket.getName(), objectMD, log,
|
||||
cb);
|
||||
}
|
||||
return cb();
|
||||
|
|
|
@ -69,6 +69,7 @@ class AwsClient {
|
|||
|
||||
_createAwsKey(requestBucketName, requestObjectKey,
|
||||
bucketMatch) {
|
||||
console.log('===', requestBucketName, requestObjectKey, bucketMatch);
|
||||
if (bucketMatch) {
|
||||
return requestObjectKey;
|
||||
}
|
||||
|
@ -489,14 +490,17 @@ class AwsClient {
|
|||
}
|
||||
|
||||
objectPutTagging(key, bucketName, objectMD, log, callback) {
|
||||
console.log('0 >>', this._client.config, this._bucketMatch, '--->', this._createAwsKey(bucketName, key, this._bucketMatch));
|
||||
const awsBucket = this._awsBucketName;
|
||||
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
|
||||
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
|
||||
console.log('1 >>', JSON.stringify(objectMD), key, awsKey);
|
||||
const tagParams = {
|
||||
Bucket: awsBucket,
|
||||
Key: awsKey,
|
||||
VersionId: dataStoreVersionId,
|
||||
};
|
||||
console.log('2 >>', tagParams);
|
||||
const keyArray = Object.keys(objectMD.tags);
|
||||
tagParams.Tagging = {};
|
||||
tagParams.Tagging.TagSet = keyArray.map(key => {
|
||||
|
@ -505,6 +509,7 @@ class AwsClient {
|
|||
});
|
||||
return this._client.putObjectTagging(tagParams, err => {
|
||||
if (err) {
|
||||
console.log('ERROR!! >>', err);
|
||||
logHelper(log, 'error', 'error from data backend on ' +
|
||||
'putObjectTagging', err,
|
||||
this._dataStoreName, this.clientType);
|
||||
|
|
|
@ -144,10 +144,19 @@ class MongoClientInterface {
|
|||
socketTimeoutMS,
|
||||
useNewUrlParser: true,
|
||||
};
|
||||
if (process.env.MONGO_POOL_SIZE &&
|
||||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||
options.minPoolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||
}
|
||||
if (process.env.MONGO_POOL_SIZE &&
|
||||
!Number.isNaN(process.env.MONGO_POOL_SIZE)) {
|
||||
options.poolSize = Number.parseInt(process.env.MONGO_POOL_SIZE, 10);
|
||||
}
|
||||
if (process.env.MONGO_MAX_POOL_SIZE &&
|
||||
!Number.isNaN(process.env.MONGO_MAX_POOL_SIZE)) {
|
||||
options.maxPoolSize = Number.parseInt(process.env.MONGO_MAX_POOL_SIZE, 10);
|
||||
}
|
||||
this.bulked = process.env.BULK_FREQ;
|
||||
return MongoClient.connect(this.mongoUrl, options, (err, client) => {
|
||||
if (err) {
|
||||
this.logger.error('error connecting to mongodb',
|
||||
|
@ -170,10 +179,77 @@ class MongoClientInterface {
|
|||
this.cacheMiss = 0;
|
||||
}, 300000);
|
||||
|
||||
this.bulks = {};
|
||||
this.bulkUpdates();
|
||||
|
||||
return this.usersBucketHack(cb);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
bulkUpdates() {
|
||||
if (!this.bulked) {
|
||||
return;
|
||||
}
|
||||
// Every BULK_FREQ || 10ms, bulk write everything we have
|
||||
setInterval(() => {
|
||||
|
||||
console.log('INTERVAL -- ', this.bulks);
|
||||
Object.keys(this.bulks).forEach(async key => {
|
||||
const c = this.bulks[key];
|
||||
if (c.operations.length === 0) {
|
||||
console.log(' > NO ', key);
|
||||
} else {
|
||||
let globalOps = [];
|
||||
let callbacks = [];
|
||||
c.operations.forEach(op => {
|
||||
globalOps = globalOps.concat(op.list);
|
||||
callbacks = callbacks.concat(op.onDone);
|
||||
});
|
||||
// Empty the arrays
|
||||
c.operations = [];
|
||||
console.log(' > ', key, 'opnb', globalOps.length, 'cbnb', callbacks.length);
|
||||
await c.client.bulkWrite(globalOps, {
|
||||
ordered: true,
|
||||
});
|
||||
callbacks.forEach(cb => cb());
|
||||
}
|
||||
});
|
||||
}, Number(process.env.BULK_FREQ) || 10);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inits the bulk object
|
||||
* @param {string} bucketName - the name of the bucket/collection
|
||||
* @param {Object} c - collection db object
|
||||
*/
|
||||
initBulk(bucketName, c) {
|
||||
if (!this.bulks[bucketName]) {
|
||||
this.bulks[bucketName] = {
|
||||
client: c,
|
||||
operations: [],
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add an array of operations to the bulker for the current bucket
|
||||
*
|
||||
* @param {string} bucketName - name of the bucket
|
||||
* @param {Array} operations - list of operations
|
||||
* @param {callback} onDone - callback once operations are done
|
||||
*/
|
||||
addToBucketBulk(bucketName, operations, onDone) {
|
||||
/*
|
||||
Example of operations
|
||||
[{operations:[...], onDone}, {operations:[...], onDone}, {operations:[...], onDone}]
|
||||
*/
|
||||
this.bulks[bucketName].operations = this.bulks[bucketName].operations.concat({
|
||||
list: operations,
|
||||
onDone,
|
||||
});
|
||||
}
|
||||
|
||||
usersBucketHack(cb) {
|
||||
/* FIXME: Since the bucket creation API is expecting the
|
||||
usersBucket to have attributes, we pre-create the
|
||||
|
@ -299,6 +375,11 @@ class MongoClientInterface {
|
|||
* @return {undefined}
|
||||
*/
|
||||
getBucketAttributes(bucketName, log, cb) {
|
||||
// Return the cached bucket if exists
|
||||
const cachedBucket = this.bucketVFormatCache.get(bucketName+'bkt');
|
||||
if (cachedBucket) {
|
||||
return cb(null, cachedBucket);
|
||||
}
|
||||
const m = this.getCollection(METASTORE);
|
||||
m.findOne({
|
||||
_id: bucketName,
|
||||
|
@ -316,6 +397,8 @@ class MongoClientInterface {
|
|||
// that properly inits w/o JSON.parse()
|
||||
const bucketMDStr = JSON.stringify(doc.value);
|
||||
const bucketMD = BucketInfo.deSerialize(bucketMDStr);
|
||||
// Cache the result
|
||||
this.bucketVFormatCache.add(bucketName+'bkt', bucketMD);
|
||||
return cb(null, bucketMD);
|
||||
});
|
||||
return undefined;
|
||||
|
@ -531,45 +614,7 @@ class MongoClientInterface {
|
|||
objVal.versionId = versionId;
|
||||
const versionKey = formatVersionKey(objName, versionId, params.vFormat);
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
// initiating array of operations with version creation
|
||||
const ops = [{
|
||||
updateOne: {
|
||||
filter: {
|
||||
_id: versionKey,
|
||||
},
|
||||
update: {
|
||||
$set: { _id: versionKey, value: objVal },
|
||||
},
|
||||
upsert: true,
|
||||
},
|
||||
}];
|
||||
// filter to get master
|
||||
const filter = {
|
||||
_id: masterKey,
|
||||
$or: [{
|
||||
'value.versionId': {
|
||||
$exists: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
'value.versionId': {
|
||||
$gt: objVal.versionId,
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
// values to update master
|
||||
const update = {
|
||||
$set: { _id: masterKey, value: objVal },
|
||||
};
|
||||
// updating or deleting master depending on the last version put
|
||||
// in v0 the master gets updated, in v1 the master gets deleted if version is
|
||||
// a delete marker or updated otherwise.
|
||||
const masterOp = this.updateDeleteMaster(objVal.isDeleteMarker, params.vFormat, filter, update, true);
|
||||
ops.push(masterOp);
|
||||
c.bulkWrite(ops, {
|
||||
ordered: true,
|
||||
}, err => {
|
||||
const onDone = err => {
|
||||
/*
|
||||
* Related to https://jira.mongodb.org/browse/SERVER-14322
|
||||
* It happens when we are pushing two versions "at the same time"
|
||||
|
@ -613,7 +658,51 @@ class MongoClientInterface {
|
|||
}
|
||||
}
|
||||
return cb(null, `{"versionId": "${versionId}"}`);
|
||||
});
|
||||
};
|
||||
|
||||
// initiating array of operations with version creation
|
||||
const ops = [{
|
||||
updateOne: {
|
||||
filter: {
|
||||
_id: versionKey,
|
||||
},
|
||||
update: {
|
||||
$set: { _id: versionKey, value: objVal },
|
||||
},
|
||||
upsert: true,
|
||||
},
|
||||
}];
|
||||
// filter to get master
|
||||
const filter = {
|
||||
_id: masterKey,
|
||||
$or: [{
|
||||
'value.versionId': {
|
||||
$exists: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
'value.versionId': {
|
||||
$gt: objVal.versionId,
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
// values to update master
|
||||
const update = {
|
||||
$set: { _id: masterKey, value: objVal },
|
||||
};
|
||||
// updating or deleting master depending on the last version put
|
||||
// in v0 the master gets updated, in v1 the master gets deleted if version is
|
||||
// a delete marker or updated otherwise.
|
||||
const masterOp = this.updateDeleteMaster(objVal.isDeleteMarker, params.vFormat, filter, update, true);
|
||||
ops.push(masterOp);
|
||||
if (this.bulked) {
|
||||
this.addToBucketBulk(bucketName, ops, onDone);
|
||||
} else {
|
||||
c.bulkWrite(ops, {
|
||||
ordered: true,
|
||||
}, onDone);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -634,14 +723,7 @@ class MongoClientInterface {
|
|||
// eslint-disable-next-line
|
||||
objVal.versionId = versionId;
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
c.update({
|
||||
_id: masterKey,
|
||||
}, {
|
||||
_id: masterKey,
|
||||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, err => {
|
||||
const onDone = err => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'putObjectVerCase2: error putting object version',
|
||||
|
@ -649,7 +731,25 @@ class MongoClientInterface {
|
|||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null, `{"versionId": "${objVal.versionId}"}`);
|
||||
});
|
||||
};
|
||||
if (this.bulked) {
|
||||
this.addToBucketBulk(bucketName, [{
|
||||
updateOne: {
|
||||
filter: { _id: masterKey },
|
||||
update: { $set: { _id: masterKey, value: objVal } },
|
||||
upsert: true,
|
||||
}
|
||||
}], onDone);
|
||||
} else {
|
||||
c.update({
|
||||
_id: masterKey,
|
||||
}, {
|
||||
_id: masterKey,
|
||||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, onDone);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -785,14 +885,7 @@ class MongoClientInterface {
|
|||
putObjectVerCase4(c, bucketName, objName, objVal, params, log, cb) {
|
||||
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
c.update({
|
||||
_id: versionKey,
|
||||
}, {
|
||||
_id: versionKey,
|
||||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, err => {
|
||||
const onDone = err => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'putObjectVerCase4: error upserting object version',
|
||||
|
@ -858,7 +951,25 @@ class MongoClientInterface {
|
|||
return undefined;
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
};
|
||||
if (this.bulked) {
|
||||
this.addToBucketBulk(bucketName, [{
|
||||
updateOne: {
|
||||
filter: { _id: versionKey },
|
||||
update: { $set: { _id: versionKey, value: objVal } },
|
||||
upsert: true,
|
||||
}
|
||||
}], onDone);
|
||||
} else {
|
||||
c.update({
|
||||
_id: versionKey,
|
||||
}, {
|
||||
_id: versionKey,
|
||||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, onDone);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -874,14 +985,7 @@ class MongoClientInterface {
|
|||
*/
|
||||
putObjectNoVer(c, bucketName, objName, objVal, params, log, cb) {
|
||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||
c.update({
|
||||
_id: masterKey,
|
||||
}, {
|
||||
_id: masterKey,
|
||||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, err => {
|
||||
const onDone = err => {
|
||||
if (err) {
|
||||
log.error(
|
||||
'putObjectNoVer: error putting obect with no versioning',
|
||||
|
@ -889,7 +993,26 @@ class MongoClientInterface {
|
|||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
};
|
||||
|
||||
if (this.bulked) {
|
||||
this.addToBucketBulk(bucketName, [{
|
||||
updateOne: {
|
||||
filter: { _id: masterKey },
|
||||
update: { $set: { _id: masterKey, value: objVal } },
|
||||
upsert: true,
|
||||
}
|
||||
}], onDone);
|
||||
} else {
|
||||
c.update({
|
||||
_id: masterKey,
|
||||
}, {
|
||||
_id: masterKey,
|
||||
value: objVal,
|
||||
}, {
|
||||
upsert: true,
|
||||
}, onDone);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -925,6 +1048,8 @@ class MongoClientInterface {
|
|||
putObject(bucketName, objName, objVal, params, log, cb) {
|
||||
MongoUtils.serialize(objVal);
|
||||
const c = this.getCollection(bucketName);
|
||||
// Create if needed the bulk object
|
||||
this.initBulk(bucketName, c);
|
||||
const _params = Object.assign({}, params);
|
||||
return this.getBucketVFormat(bucketName, log, (err, vFormat) => {
|
||||
if (err) {
|
||||
|
|
Loading…
Reference in New Issue