Compare commits

...

3 Commits

Author SHA1 Message Date
Jonathan Gramain 9ec289c561 S3C-3778: cleanup: remove ifMultipleBackend step from abortMultipartUpload
Remove a Zenko-only step that is not needed in 7.x branches
2021-05-04 16:46:19 -07:00
Jonathan Gramain 4286d0e50a S3C-3778: cleanup: remove ifMultipleBackend step in completeMultipartUpload
Remove a Zenko-only step that is not needed in 7.x branches.
2021-05-04 16:46:19 -07:00
Jonathan Gramain 19ed76cb40 bugfix: S3C-3778 unit test showing a duplicate version
Create a unit test showing that a duplicate version is created after a
complete-multipart-upload fails and is retried. Having more than one
version in this case creates shared data keys that cause a risk of
data loss once one of the versions is deleted.
2021-05-04 15:55:06 -07:00
4 changed files with 105 additions and 159 deletions

View File

@ -1,16 +1,13 @@
const async = require('async');
const { config } = require('../../../Config');
const constants = require('../../../../constants');
const data = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { metadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils');
const multipleBackendGateway = require('../../../data/multipleBackendGateway');
const services = require('../../../services');
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
callback, request) {
callback) {
const metadataValMPUparams = {
authInfo,
bucketName,
@ -47,61 +44,24 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
function checkMPUval(destBucket, next) {
metadataValParams.log = log;
services.metadataValidateMultipart(metadataValParams,
(err, mpuBucket, mpuOverviewObj) => {
(err, mpuBucket) => {
if (err) {
return next(err, destBucket);
}
return next(err, mpuBucket, mpuOverviewObj, destBucket);
return next(null, mpuBucket, destBucket);
});
},
function ifMultipleBackend(mpuBucket, mpuOverviewObj, destBucket,
next) {
if (config.backends.data === 'multiple') {
let location;
// if controlling location constraint is not stored in object
// metadata, mpu was initiated in legacy S3C, so need to
// determine correct location constraint
if (!mpuOverviewObj.controllingLocationConstraint) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err, destBucket);
});
}
location = backendInfoObj.controllingLC;
} else {
location = mpuOverviewObj.controllingLocationConstraint;
}
return multipleBackendGateway.abortMPU(objectKey, uploadId,
location, bucketName, log, (err, skipDataDelete) => {
if (err) {
return next(err, destBucket);
}
return next(null, mpuBucket, destBucket,
skipDataDelete);
});
}
return next(null, mpuBucket, destBucket, false);
},
function getPartLocations(mpuBucket, destBucket, skipDataDelete,
next) {
function getPartLocations(mpuBucket, destBucket, next) {
services.getMPUparts(mpuBucket.getName(), uploadId, log,
(err, result) => {
if (err) {
return next(err, destBucket);
}
const storedParts = result.Contents;
return next(null, mpuBucket, storedParts, destBucket,
skipDataDelete);
return next(null, mpuBucket, storedParts, destBucket);
});
},
function deleteData(mpuBucket, storedParts, destBucket,
skipDataDelete, next) {
// for Azure we do not need to delete data
if (skipDataDelete) {
return next(null, mpuBucket, storedParts, destBucket);
}
function deleteData(mpuBucket, storedParts, destBucket, next) {
// The locations were sent to metadata as an array
// under partLocations. Pull the partLocations.
let locations = storedParts.map(item => item.value.partLocations);

View File

@ -7,8 +7,6 @@ const { pushMetric } = require('../utapi/utilities');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const { validateAndFilterMpuParts, generateMpuPartStorageInfo } =
require('./apiUtils/object/processMpuParts');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const data = require('../data/wrapper');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
@ -18,9 +16,6 @@ const { versioningPreprocessing, checkQueryVersionId }
const metadata = require('../metadata/wrapper');
const services = require('../services');
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const { skipMpuPartProcessing } = require('../data/external/utils');
const locationConstraintCheck
= require('./apiUtils/object/locationConstraintCheck');
const locationKeysSanityCheck
= require('./apiUtils/object/locationKeysSanityCheck');
@ -159,13 +154,12 @@ function completeMultipartUpload(authInfo, request, log, callback) {
if (err) {
return next(err, destBucket);
}
const location = storedMetadata.controllingLocationConstraint;
return next(null, destBucket, objMD, mpuBucket, jsonList,
storedMetadata, location, mpuOverviewKey);
storedMetadata, mpuOverviewKey);
});
},
function retrieveParts(destBucket, objMD, mpuBucket, jsonList,
storedMetadata, location, mpuOverviewKey, next) {
storedMetadata, mpuOverviewKey, next) {
return services.getMPUparts(mpuBucket.getName(), uploadId, log,
(err, result) => {
if (err) {
@ -173,79 +167,22 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
const storedParts = result.Contents;
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey);
jsonList, storedMetadata, mpuOverviewKey);
});
},
function ifMultipleBackend(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey, next) {
if (config.backends.data === 'multiple') {
// if mpu was initiated in legacy version
if (location === undefined) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err);
});
}
// eslint-disable-next-line no-param-reassign
location = backendInfoObj.controllingLC;
}
const mdInfo = { storedParts, mpuOverviewKey, splitter };
return multipleBackendGateway.completeMPU(objectKey,
uploadId, location, jsonList, mdInfo, bucketName, null, null,
log, (err, completeObjData) => {
if (err) {
return next(err, destBucket);
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData,
mpuOverviewKey);
});
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, null, mpuOverviewKey);
},
function validateAndFilterParts(destBucket, objMD, mpuBucket,
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey,
storedParts, jsonList, storedMetadata, mpuOverviewKey,
next) {
if (completeObjData) {
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
completeObjData.filteredPartsObj);
}
const filteredPartsObj = validateAndFilterMpuParts(storedParts,
jsonList, mpuOverviewKey, splitter, log);
if (filteredPartsObj.error) {
return next(filteredPartsObj.error, destBucket);
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj);
jsonList, storedMetadata, mpuOverviewKey, filteredPartsObj);
},
function processParts(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
filteredPartsObj, next) {
// if mpu was completed on backend that stored mpu MD externally,
// skip MD processing steps
if (completeObjData && skipMpuPartProcessing(completeObjData)) {
const dataLocations = [
{
key: completeObjData.key,
size: completeObjData.contentLength,
start: 0,
dataStoreVersionId: completeObjData.dataStoreVersionId,
dataStoreName: storedMetadata.dataStoreName,
dataStoreETag: completeObjData.eTag,
dataStoreType: completeObjData.dataStoreType,
},
];
const calculatedSize = completeObjData.contentLength;
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
completeObjData.eTag, calculatedSize, dataLocations,
[mpuOverviewKey], null, completeObjData);
}
jsonList, storedMetadata, mpuOverviewKey, filteredPartsObj, next) {
const partsInfo =
generateMpuPartStorageInfo(filteredPartsObj.partList);
if (partsInfo.error) {
@ -254,28 +191,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
const { keysToDelete, extraPartLocations } = filteredPartsObj;
const { aggregateETag, dataLocations, calculatedSize } = partsInfo;
if (completeObjData) {
const dataLocations = [
{
key: completeObjData.key,
size: calculatedSize,
start: 0,
dataStoreName: storedMetadata.dataStoreName,
dataStoreETag: aggregateETag,
dataStoreType: completeObjData.dataStoreType,
},
];
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData);
}
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, null);
extraPartLocations);
},
function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData, next) {
extraPartLocations, next) {
const metaHeaders = {};
const keysNotNeeded =
['initiator', 'partLocations', 'key',
@ -339,13 +261,12 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle,
dataToDelete, completeObjData);
dataToDelete);
});
},
function storeAsNewObj(destinationBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD,
extraPartLocations, pseudoCipherBundle, dataToDelete,
completeObjData, next) {
extraPartLocations, pseudoCipherBundle, dataToDelete, next) {
return services.metadataStoreObject(destinationBucket.getName(),
dataLocations, pseudoCipherBundle, metaStoreParams,
(err, res) => {

View File

@ -419,7 +419,7 @@ const services = {
/**
* Checks whether bucket exists, multipart upload
* has been initatied and the user is authorized
* has been initiated and the user is authorized
* @param {object} params - custom built object containing
* bucket name, uploadId, authInfo etc.
* @param {function} cb - callback containing error and

View File

@ -18,6 +18,7 @@ const { ds } = require('../../../lib/data/in_memory/backend');
const initiateMultipartUpload
= require('../../../lib/api/initiateMultipartUpload');
const { metadata } = require('../../../lib/metadata/in_memory/metadata');
const metadataBackend = require('../../../lib/metadata/in_memory/backend');
const multipartDelete = require('../../../lib/api/multipartDelete');
const objectPutPart = require('../../../lib/api/objectPutPart');
const DummyRequest = require('../DummyRequest');
@ -1695,28 +1696,9 @@ describe('complete mpu with versioning', () => {
versioningTestUtils.createPutObjectRequest(bucketName, objectKey,
data));
before(done => {
beforeEach(done => {
cleanup();
async.series([
callback => bucketPut(authInfo, bucketPutRequest, log,
callback),
// putting null version: put obj before versioning configured
callback => objectPut(authInfo, testPutObjectRequests[0],
undefined, log, callback),
callback => bucketPutVersioning(authInfo,
enableVersioningRequest, log, callback),
// put another version:
callback => objectPut(authInfo, testPutObjectRequests[1],
undefined, log, callback),
callback => bucketPutVersioning(authInfo,
suspendVersioningRequest, log, callback),
], err => {
if (err) {
return done(err);
}
versioningTestUtils.assertDataStoreValues(ds, objData.slice(0, 2));
return done();
});
bucketPut(authInfo, bucketPutRequest, log, done);
});
after(done => {
@ -1727,8 +1709,21 @@ describe('complete mpu with versioning', () => {
it('should delete null version when creating new null version, ' +
'even when null version is not the latest version', done => {
async.waterfall([
next =>
initiateMultipartUpload(authInfo, initiateRequest, log, next),
// putting null version: put obj before versioning configured
next => objectPut(authInfo, testPutObjectRequests[0],
undefined, log, err => next(err)),
next => bucketPutVersioning(authInfo,
enableVersioningRequest, log, err => next(err)),
// put another version:
next => objectPut(authInfo, testPutObjectRequests[1],
undefined, log, err => next(err)),
next => bucketPutVersioning(authInfo,
suspendVersioningRequest, log, err => next(err)),
next => {
versioningTestUtils.assertDataStoreValues(
ds, objData.slice(0, 2));
initiateMultipartUpload(authInfo, initiateRequest, log, next);
},
(result, corsHeaders, next) => parseString(result, next),
(json, next) => {
const partBody = objData[2];
@ -1754,4 +1749,74 @@ describe('complete mpu with versioning', () => {
});
});
});
it('should finish deleting metadata on completeMultipartUpload retry',
done => {
let origDeleteObject;
async.waterfall([
next => bucketPutVersioning(authInfo,
enableVersioningRequest, log, err => next(err)),
next =>
initiateMultipartUpload(authInfo, initiateRequest, log, next),
(result, corsHeaders, next) => parseString(result, next),
(json, next) => {
const partBody = objData[2];
const testUploadId =
json.InitiateMultipartUploadResult.UploadId[0];
const partRequest = _createPutPartRequest(testUploadId, 1,
partBody);
objectPutPart(authInfo, partRequest, undefined, log,
(err, eTag) => next(err, eTag, testUploadId));
},
(eTag, testUploadId, next) => {
origDeleteObject = metadataBackend.deleteObject;
metadataBackend.deleteObject = (
bucketName, objName, params, log, cb) => {
// prevent deletions from MPU bucket only
if (bucketName === mpuBucket) {
return process.nextTick(
() => cb(errors.InternalError));
}
return origDeleteObject(
bucketName, objName, params, log, cb);
};
const parts = [{ partNumber: 1, eTag }];
const completeRequest = _createCompleteMpuRequest(
testUploadId, parts);
completeMultipartUpload(authInfo, completeRequest, log, err => {
// expect a failure here because we could not
// remove the overview key
assert.strictEqual(err, errors.InternalError);
next(null, eTag, testUploadId);
});
},
(eTag, testUploadId, next) => {
// allow MPU bucket metadata deletions to happen again
metadataBackend.deleteObject = origDeleteObject;
// retry the completeMultipartUpload with the same
// metadata, as an application would normally do after
// a failure
const parts = [{ partNumber: 1, eTag }];
const completeRequest = _createCompleteMpuRequest(
testUploadId, parts);
completeMultipartUpload(authInfo, completeRequest, log, next);
},
], err => {
assert.ifError(err);
let nbVersions = 0;
for (const key of metadata.keyMaps.get(bucketName).keys()) {
if (key !== objectKey && key.startsWith(objectKey)) {
nbVersions += 1;
}
}
// There should be only one version of the object, since
// the second call should not have created a new version
assert.strictEqual(nbVersions, 1);
for (const key of metadata.keyMaps.get(mpuBucket).keys()) {
assert.fail('There should be no more keys in MPU bucket, ' +
`found "${key}"`);
}
done();
});
});
});