Compare commits

...

1 Commits

Author SHA1 Message Date
williamlardier f08d066d82
ARSN-345: support batching and optimize 2023-06-07 18:25:24 +02:00
2 changed files with 103 additions and 12 deletions

View File

@ -292,6 +292,19 @@ class MetadataWrapper {
}); });
} }
getObjectsMD(bucketName, objNamesWithParams, log, cb) {
log.debug('getting object from metadata');
this.client.getObjects(bucketName, objNamesWithParams, log, (err, data) => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
err });
return cb(err);
}
log.debug('object retrieved from metadata');
return cb(err, data);
});
}
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') { deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
log.debug('deleting object from metadata'); log.debug('deleting object from metadata');
this.client.deleteObject(bucketName, objName, params, log, err => { this.client.deleteObject(bucketName, objName, params, log, err => {

View File

@ -1041,6 +1041,61 @@ class MongoClientInterface {
], cb); ], cb);
} }
getObjects(bucketName, objects, log, callback) {
const c = this.getCollection(bucketName);
let vFormat = null;
// Function to process each document
const processDoc = (doc, objName, params, cb) => {
if (!doc && params && params.versionId) {
return cb(errors.NoSuchKey);
}
// If no master found then object is either non existent
// or last version is delete marker
if (!doc || doc.value.isPHD) {
return this.getLatestVersion(c, objName, vFormat, log, (err, doc) => {
if (err && !err.is.NoSuchKey) {
return cb(err);
}
return cb(null, doc || null);
});
}
MongoUtils.unserialize(doc.value);
return cb(null, doc.value);
};
this.getBucketVFormat(bucketName, log, (err, _vFormat) => {
if (err) {
return callback(err);
}
vFormat = _vFormat;
// Create keys, maintaining the context with each key
const keysAndObjects = objects.map(({ key: objName, params }) => {
const _key = params && params.versionId
? formatVersionKey(objName, params.versionId, vFormat)
: formatMasterKey(objName, vFormat);
return { key: _key, objName, params };
});
// Extract keys and find documents
const keys = keysAndObjects.map(o => o.key);
return c.find({
_id: { $in: keys },
$or: [
{ 'value.deleted': { $exists: false } },
{ 'value.deleted': { $eq: false } },
],
}).toArray().then(docs => {
// Create a Map to quickly find docs by their keys
const docByKey = new Map(docs.map(doc => [doc._id, doc]));
// Process each document using associated context (objName, params)
async.mapLimit(keysAndObjects, 5, ({ key, objName, params }, cb) => {
const doc = docByKey.get(key);
processDoc(doc, objName, params, cb);
}, callback);
}).catch(err => {
callback(err);
});
});
}
/** /**
* This function return the latest version of an object * This function return the latest version of an object
* by getting all keys related to an object's versions, ordering them * by getting all keys related to an object's versions, ordering them
@ -1279,7 +1334,7 @@ class MongoClientInterface {
return next(null); return next(null);
} }
return next(err); return next(err);
}, originOp), }, originOp, params),
], err => { ], err => {
if (err) { if (err) {
log.error( log.error(
@ -1321,7 +1376,7 @@ class MongoClientInterface {
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null); return cb(null);
}, originOp); }, originOp, params);
} }
/** /**
@ -1410,7 +1465,7 @@ class MongoClientInterface {
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return cb(null); return cb(null);
}, originOp); }, originOp, params);
} }
/** /**
@ -1425,10 +1480,27 @@ class MongoClientInterface {
* @param {Function} cb callback containing error * @param {Function} cb callback containing error
* and BulkWriteResult * and BulkWriteResult
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation * @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
* @param {object} [params] request params
* @return {undefined} * @return {undefined}
*/ */
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') { internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete', params = null) {
// filter used when finding and updating object // filter used when deleting object
const deleteFilter = Object.assign({
_id: key,
}, filter);
if (params?.shouldOnlyDelete) {
// If flag is true, directly delete object
return collection.deleteOne(deleteFilter)
.then(() => cb(null))
.catch(err => {
log.error('internalDeleteObject: error deleting object',
{ bucket: bucketName, object: key, error: err.message });
return cb(errors.InternalError);
});
}
// If flag is false, proceed with normal operations
const findFilter = Object.assign({ const findFilter = Object.assign({
_id: key, _id: key,
$or: [ $or: [
@ -1436,12 +1508,13 @@ class MongoClientInterface {
{ 'value.deleted': { $eq: false } }, { 'value.deleted': { $eq: false } },
], ],
}, filter); }, filter);
// filter used when deleting object
const updateDeleteFilter = Object.assign({ const updateDeleteFilter = Object.assign({
'_id': key, '_id': key,
'value.deleted': true, 'value.deleted': true,
}, filter); }, filter);
async.waterfall([
return async.waterfall([
// Adding delete flag when getting the object // Adding delete flag when getting the object
// to avoid having race conditions. // to avoid having race conditions.
next => collection.findOneAndUpdate(findFilter, { next => collection.findOneAndUpdate(findFilter, {
@ -1469,8 +1542,8 @@ class MongoClientInterface {
}), }),
// We update the full object to get the whole object metadata // We update the full object to get the whole object metadata
// in the oplog update event // in the oplog update event
(objMetadata, next) => collection.bulkWrite([ (objMetadata, next) => {
{ const operations = [{
updateOne: { updateOne: {
filter: updateDeleteFilter, filter: updateDeleteFilter,
update: { update: {
@ -1478,12 +1551,17 @@ class MongoClientInterface {
}, },
upsert: false, upsert: false,
}, },
}, { }];
// Add the delete operation if the shouldDelete flag is true
operations.push({
deleteOne: { deleteOne: {
filter: updateDeleteFilter, filter: updateDeleteFilter,
}, },
}, });
], { ordered: true }).then(() => next(null)).catch(() => next()),
collection.bulkWrite(operations, { ordered: true }).then(() => next(null)).catch(() => next());
},
], (err, res) => { ], (err, res) => {
if (err) { if (err) {
if (err.is.NoSuchKey) { if (err.is.NoSuchKey) {