Compare commits
6 Commits
20e9fe4adb
...
9c86da55a4
Author | SHA1 | Date |
---|---|---|
williamlardier | 9c86da55a4 | |
williamlardier | a600a3d6fa | |
williamlardier | d26e3d7a7f | |
williamlardier | ff10ad38c1 | |
williamlardier | 12dad07986 | |
williamlardier | ec5bc8c933 |
|
@ -292,6 +292,19 @@ class MetadataWrapper {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getObjectsMD(bucketName, objNamesWithParams, log, cb) {
|
||||||
|
log.debug('getting object from metadata');
|
||||||
|
this.client.getObjects(bucketName, objNamesWithParams, log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
log.debug('error from metadata', { implName: this.implName,
|
||||||
|
err });
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
log.debug('object retrieved from metadata');
|
||||||
|
return cb(err, data);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
log.debug('deleting object from metadata');
|
log.debug('deleting object from metadata');
|
||||||
this.client.deleteObject(bucketName, objName, params, log, err => {
|
this.client.deleteObject(bucketName, objName, params, log, err => {
|
||||||
|
|
|
@ -1041,6 +1041,61 @@ class MongoClientInterface {
|
||||||
], cb);
|
], cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getObjects(bucketName, objects, log, callback) {
|
||||||
|
const c = this.getCollection(bucketName);
|
||||||
|
let vFormat = null;
|
||||||
|
// Function to process each document
|
||||||
|
const processDoc = (doc, objName, params, cb) => {
|
||||||
|
if (!doc && params && params.versionId) {
|
||||||
|
return cb(errors.NoSuchKey);
|
||||||
|
}
|
||||||
|
// If no master found then object is either non existent
|
||||||
|
// or last version is delete marker
|
||||||
|
if (!doc || doc.value.isPHD) {
|
||||||
|
return this.getLatestVersion(c, objName, vFormat, log, (err, doc) => {
|
||||||
|
if (err && !err.is.NoSuchKey) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return cb(null, doc || null);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
MongoUtils.unserialize(doc.value);
|
||||||
|
return cb(null, doc.value);
|
||||||
|
};
|
||||||
|
this.getBucketVFormat(bucketName, log, (err, _vFormat) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
vFormat = _vFormat;
|
||||||
|
// Create keys, maintaining the context with each key
|
||||||
|
const keysAndObjects = objects.map(({ key: objName, params }) => {
|
||||||
|
const _key = params && params.versionId
|
||||||
|
? formatVersionKey(objName, params.versionId, vFormat)
|
||||||
|
: formatMasterKey(objName, vFormat);
|
||||||
|
return { key: _key, objName, params };
|
||||||
|
});
|
||||||
|
// Extract keys and find documents
|
||||||
|
const keys = keysAndObjects.map(o => o.key);
|
||||||
|
return c.find({
|
||||||
|
_id: { $in: keys },
|
||||||
|
$or: [
|
||||||
|
{ 'value.deleted': { $exists: false } },
|
||||||
|
{ 'value.deleted': { $eq: false } },
|
||||||
|
],
|
||||||
|
}).toArray().then(docs => {
|
||||||
|
// Create a Map to quickly find docs by their keys
|
||||||
|
const docByKey = new Map(docs.map(doc => [doc._id, doc]));
|
||||||
|
// Process each document using associated context (objName, params)
|
||||||
|
async.mapLimit(keysAndObjects, 5, ({ key, objName, params }, cb) => {
|
||||||
|
const doc = docByKey.get(key);
|
||||||
|
processDoc(doc, objName, params, cb);
|
||||||
|
}, callback);
|
||||||
|
}).catch(err => {
|
||||||
|
callback(err);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function return the latest version of an object
|
* This function return the latest version of an object
|
||||||
* by getting all keys related to an object's versions, ordering them
|
* by getting all keys related to an object's versions, ordering them
|
||||||
|
@ -1199,7 +1254,7 @@ class MongoClientInterface {
|
||||||
'value.isPHD': true,
|
'value.isPHD': true,
|
||||||
'value.versionId': mst.versionId,
|
'value.versionId': mst.versionId,
|
||||||
};
|
};
|
||||||
this.internalDeleteObject(c, bucketName, masterKey, filter, log, err => {
|
this.internalDeleteObject(c, bucketName, masterKey, {}, filter, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
// the PHD master might get updated when a PUT is performed
|
// the PHD master might get updated when a PUT is performed
|
||||||
// before the repair is done, we don't want to return an error
|
// before the repair is done, we don't want to return an error
|
||||||
|
@ -1270,7 +1325,7 @@ class MongoClientInterface {
|
||||||
.then(() => next())
|
.then(() => next())
|
||||||
.catch(err => next(err)),
|
.catch(err => next(err)),
|
||||||
// delete version
|
// delete version
|
||||||
next => this.internalDeleteObject(c, bucketName, versionKey, {}, log,
|
next => this.internalDeleteObject(c, bucketName, versionKey, params, {}, log,
|
||||||
err => {
|
err => {
|
||||||
// we don't return an error in case we don't find
|
// we don't return an error in case we don't find
|
||||||
// a version as we expect this case when dealing with
|
// a version as we expect this case when dealing with
|
||||||
|
@ -1307,7 +1362,7 @@ class MongoClientInterface {
|
||||||
*/
|
*/
|
||||||
deleteObjectVerNotMaster(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
deleteObjectVerNotMaster(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
|
const versionKey = formatVersionKey(objName, params.versionId, params.vFormat);
|
||||||
this.internalDeleteObject(c, bucketName, versionKey, {}, log, err => {
|
this.internalDeleteObject(c, bucketName, versionKey, params, {}, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err.is.NoSuchKey) {
|
if (err.is.NoSuchKey) {
|
||||||
log.error(
|
log.error(
|
||||||
|
@ -1398,7 +1453,7 @@ class MongoClientInterface {
|
||||||
*/
|
*/
|
||||||
deleteObjectNoVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
deleteObjectNoVer(c, bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
const masterKey = formatMasterKey(objName, params.vFormat);
|
const masterKey = formatMasterKey(objName, params.vFormat);
|
||||||
this.internalDeleteObject(c, bucketName, masterKey, {}, log, err => {
|
this.internalDeleteObject(c, bucketName, masterKey, params, {}, log, err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
// Should not return an error when no object is found
|
// Should not return an error when no object is found
|
||||||
if (err.is.NoSuchKey) {
|
if (err.is.NoSuchKey) {
|
||||||
|
@ -1420,6 +1475,7 @@ class MongoClientInterface {
|
||||||
* @param {Object} collection MongoDB collection
|
* @param {Object} collection MongoDB collection
|
||||||
* @param {string} bucketName bucket name
|
* @param {string} bucketName bucket name
|
||||||
* @param {string} key Key of the object to delete
|
* @param {string} key Key of the object to delete
|
||||||
|
* @param {object} params request params
|
||||||
* @param {object} filter additional query filters
|
* @param {object} filter additional query filters
|
||||||
* @param {Logger}log logger instance
|
* @param {Logger}log logger instance
|
||||||
* @param {Function} cb callback containing error
|
* @param {Function} cb callback containing error
|
||||||
|
@ -1427,8 +1483,24 @@ class MongoClientInterface {
|
||||||
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
|
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
internalDeleteObject(collection, bucketName, key, params, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
|
||||||
// filter used when finding and updating object
|
// filter used when deleting object
|
||||||
|
const deleteFilter = Object.assign({
|
||||||
|
_id: key,
|
||||||
|
}, filter);
|
||||||
|
|
||||||
|
if (params.shouldOnlyDelete) {
|
||||||
|
// If flag is true, directly delete object
|
||||||
|
return collection.deleteOne(deleteFilter)
|
||||||
|
.then(() => cb(null))
|
||||||
|
.catch(err => {
|
||||||
|
log.error('internalDeleteObject: error deleting object',
|
||||||
|
{ bucket: bucketName, object: key, error: err.message });
|
||||||
|
return cb(errors.InternalError);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// If flag is false, proceed with normal operations
|
||||||
const findFilter = Object.assign({
|
const findFilter = Object.assign({
|
||||||
_id: key,
|
_id: key,
|
||||||
$or: [
|
$or: [
|
||||||
|
@ -1436,12 +1508,13 @@ class MongoClientInterface {
|
||||||
{ 'value.deleted': { $eq: false } },
|
{ 'value.deleted': { $eq: false } },
|
||||||
],
|
],
|
||||||
}, filter);
|
}, filter);
|
||||||
// filter used when deleting object
|
|
||||||
const updateDeleteFilter = Object.assign({
|
const updateDeleteFilter = Object.assign({
|
||||||
'_id': key,
|
'_id': key,
|
||||||
'value.deleted': true,
|
'value.deleted': true,
|
||||||
}, filter);
|
}, filter);
|
||||||
async.waterfall([
|
|
||||||
|
return async.waterfall([
|
||||||
// Adding delete flag when getting the object
|
// Adding delete flag when getting the object
|
||||||
// to avoid having race conditions.
|
// to avoid having race conditions.
|
||||||
next => collection.findOneAndUpdate(findFilter, {
|
next => collection.findOneAndUpdate(findFilter, {
|
||||||
|
@ -1469,8 +1542,8 @@ class MongoClientInterface {
|
||||||
}),
|
}),
|
||||||
// We update the full object to get the whole object metadata
|
// We update the full object to get the whole object metadata
|
||||||
// in the oplog update event
|
// in the oplog update event
|
||||||
(objMetadata, next) => collection.bulkWrite([
|
(objMetadata, next) => {
|
||||||
{
|
const operations = [{
|
||||||
updateOne: {
|
updateOne: {
|
||||||
filter: updateDeleteFilter,
|
filter: updateDeleteFilter,
|
||||||
update: {
|
update: {
|
||||||
|
@ -1478,12 +1551,17 @@ class MongoClientInterface {
|
||||||
},
|
},
|
||||||
upsert: false,
|
upsert: false,
|
||||||
},
|
},
|
||||||
}, {
|
}];
|
||||||
|
|
||||||
|
// Add the delete operation if the shouldDelete flag is true
|
||||||
|
operations.push({
|
||||||
deleteOne: {
|
deleteOne: {
|
||||||
filter: updateDeleteFilter,
|
filter: updateDeleteFilter,
|
||||||
},
|
},
|
||||||
},
|
});
|
||||||
], { ordered: true }).then(() => next(null)).catch(() => next()),
|
|
||||||
|
collection.bulkWrite(operations, { ordered: true }).then(() => next(null)).catch(() => next());
|
||||||
|
},
|
||||||
], (err, res) => {
|
], (err, res) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
if (err.is.NoSuchKey) {
|
if (err.is.NoSuchKey) {
|
||||||
|
|
Loading…
Reference in New Issue