Compare commits

...

3 Commits

Author SHA1 Message Date
Jonathan Gramain 9ec289c561 S3C-3778: cleanup: remove ifMultipleBackend step from abortMultipartUpload
Remove a Zenko-only step that is not needed in 7.x branches
2021-05-04 16:46:19 -07:00
Jonathan Gramain 4286d0e50a S3C-3778: cleanup: remove ifMultipleBackend step in completeMultipartUpload
Remove a Zenko-only step that is not needed in 7.x branches.
2021-05-04 16:46:19 -07:00
Jonathan Gramain 19ed76cb40 bugfix: S3C-3778 unit test showing a duplicate version
Create a unit test showing that a duplicate version is created after a
complete-multipart-upload fails and is retried. Having more than one
version in this case creates shared data keys that cause a risk of
data loss once one of the versions is deleted.
2021-05-04 15:55:06 -07:00
4 changed files with 105 additions and 159 deletions

View File

@ -1,16 +1,13 @@
const async = require('async'); const async = require('async');
const { config } = require('../../../Config');
const constants = require('../../../../constants'); const constants = require('../../../../constants');
const data = require('../../../data/wrapper'); const data = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { metadataValidateBucketAndObj } = const { metadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils'); require('../../../metadata/metadataUtils');
const multipleBackendGateway = require('../../../data/multipleBackendGateway');
const services = require('../../../services'); const services = require('../../../services');
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log, function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
callback, request) { callback) {
const metadataValMPUparams = { const metadataValMPUparams = {
authInfo, authInfo,
bucketName, bucketName,
@ -47,61 +44,24 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
function checkMPUval(destBucket, next) { function checkMPUval(destBucket, next) {
metadataValParams.log = log; metadataValParams.log = log;
services.metadataValidateMultipart(metadataValParams, services.metadataValidateMultipart(metadataValParams,
(err, mpuBucket, mpuOverviewObj) => { (err, mpuBucket) => {
if (err) { if (err) {
return next(err, destBucket); return next(err, destBucket);
} }
return next(err, mpuBucket, mpuOverviewObj, destBucket); return next(null, mpuBucket, destBucket);
}); });
}, },
function ifMultipleBackend(mpuBucket, mpuOverviewObj, destBucket, function getPartLocations(mpuBucket, destBucket, next) {
next) {
if (config.backends.data === 'multiple') {
let location;
// if controlling location constraint is not stored in object
// metadata, mpu was initiated in legacy S3C, so need to
// determine correct location constraint
if (!mpuOverviewObj.controllingLocationConstraint) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err, destBucket);
});
}
location = backendInfoObj.controllingLC;
} else {
location = mpuOverviewObj.controllingLocationConstraint;
}
return multipleBackendGateway.abortMPU(objectKey, uploadId,
location, bucketName, log, (err, skipDataDelete) => {
if (err) {
return next(err, destBucket);
}
return next(null, mpuBucket, destBucket,
skipDataDelete);
});
}
return next(null, mpuBucket, destBucket, false);
},
function getPartLocations(mpuBucket, destBucket, skipDataDelete,
next) {
services.getMPUparts(mpuBucket.getName(), uploadId, log, services.getMPUparts(mpuBucket.getName(), uploadId, log,
(err, result) => { (err, result) => {
if (err) { if (err) {
return next(err, destBucket); return next(err, destBucket);
} }
const storedParts = result.Contents; const storedParts = result.Contents;
return next(null, mpuBucket, storedParts, destBucket, return next(null, mpuBucket, storedParts, destBucket);
skipDataDelete);
}); });
}, },
function deleteData(mpuBucket, storedParts, destBucket, function deleteData(mpuBucket, storedParts, destBucket, next) {
skipDataDelete, next) {
// for Azure we do not need to delete data
if (skipDataDelete) {
return next(null, mpuBucket, storedParts, destBucket);
}
// The locations were sent to metadata as an array // The locations were sent to metadata as an array
// under partLocations. Pull the partLocations. // under partLocations. Pull the partLocations.
let locations = storedParts.map(item => item.value.partLocations); let locations = storedParts.map(item => item.value.partLocations);

View File

@ -7,8 +7,6 @@ const { pushMetric } = require('../utapi/utilities');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo'); const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const { validateAndFilterMpuParts, generateMpuPartStorageInfo } = const { validateAndFilterMpuParts, generateMpuPartStorageInfo } =
require('./apiUtils/object/processMpuParts'); require('./apiUtils/object/processMpuParts');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const data = require('../data/wrapper'); const data = require('../data/wrapper');
const collectCorsHeaders = require('../utilities/collectCorsHeaders'); const collectCorsHeaders = require('../utilities/collectCorsHeaders');
@ -18,9 +16,6 @@ const { versioningPreprocessing, checkQueryVersionId }
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const services = require('../services'); const services = require('../services');
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils'); const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const { skipMpuPartProcessing } = require('../data/external/utils');
const locationConstraintCheck
= require('./apiUtils/object/locationConstraintCheck');
const locationKeysSanityCheck const locationKeysSanityCheck
= require('./apiUtils/object/locationKeysSanityCheck'); = require('./apiUtils/object/locationKeysSanityCheck');
@ -159,13 +154,12 @@ function completeMultipartUpload(authInfo, request, log, callback) {
if (err) { if (err) {
return next(err, destBucket); return next(err, destBucket);
} }
const location = storedMetadata.controllingLocationConstraint;
return next(null, destBucket, objMD, mpuBucket, jsonList, return next(null, destBucket, objMD, mpuBucket, jsonList,
storedMetadata, location, mpuOverviewKey); storedMetadata, mpuOverviewKey);
}); });
}, },
function retrieveParts(destBucket, objMD, mpuBucket, jsonList, function retrieveParts(destBucket, objMD, mpuBucket, jsonList,
storedMetadata, location, mpuOverviewKey, next) { storedMetadata, mpuOverviewKey, next) {
return services.getMPUparts(mpuBucket.getName(), uploadId, log, return services.getMPUparts(mpuBucket.getName(), uploadId, log,
(err, result) => { (err, result) => {
if (err) { if (err) {
@ -173,79 +167,22 @@ function completeMultipartUpload(authInfo, request, log, callback) {
} }
const storedParts = result.Contents; const storedParts = result.Contents;
return next(null, destBucket, objMD, mpuBucket, storedParts, return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey); jsonList, storedMetadata, mpuOverviewKey);
}); });
}, },
function ifMultipleBackend(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey, next) {
if (config.backends.data === 'multiple') {
// if mpu was initiated in legacy version
if (location === undefined) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err);
});
}
// eslint-disable-next-line no-param-reassign
location = backendInfoObj.controllingLC;
}
const mdInfo = { storedParts, mpuOverviewKey, splitter };
return multipleBackendGateway.completeMPU(objectKey,
uploadId, location, jsonList, mdInfo, bucketName, null, null,
log, (err, completeObjData) => {
if (err) {
return next(err, destBucket);
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData,
mpuOverviewKey);
});
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, null, mpuOverviewKey);
},
function validateAndFilterParts(destBucket, objMD, mpuBucket, function validateAndFilterParts(destBucket, objMD, mpuBucket,
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey, storedParts, jsonList, storedMetadata, mpuOverviewKey,
next) { next) {
if (completeObjData) {
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
completeObjData.filteredPartsObj);
}
const filteredPartsObj = validateAndFilterMpuParts(storedParts, const filteredPartsObj = validateAndFilterMpuParts(storedParts,
jsonList, mpuOverviewKey, splitter, log); jsonList, mpuOverviewKey, splitter, log);
if (filteredPartsObj.error) { if (filteredPartsObj.error) {
return next(filteredPartsObj.error, destBucket); return next(filteredPartsObj.error, destBucket);
} }
return next(null, destBucket, objMD, mpuBucket, storedParts, return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey, jsonList, storedMetadata, mpuOverviewKey, filteredPartsObj);
filteredPartsObj);
}, },
function processParts(destBucket, objMD, mpuBucket, storedParts, function processParts(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData, mpuOverviewKey, jsonList, storedMetadata, mpuOverviewKey, filteredPartsObj, next) {
filteredPartsObj, next) {
// if mpu was completed on backend that stored mpu MD externally,
// skip MD processing steps
if (completeObjData && skipMpuPartProcessing(completeObjData)) {
const dataLocations = [
{
key: completeObjData.key,
size: completeObjData.contentLength,
start: 0,
dataStoreVersionId: completeObjData.dataStoreVersionId,
dataStoreName: storedMetadata.dataStoreName,
dataStoreETag: completeObjData.eTag,
dataStoreType: completeObjData.dataStoreType,
},
];
const calculatedSize = completeObjData.contentLength;
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
completeObjData.eTag, calculatedSize, dataLocations,
[mpuOverviewKey], null, completeObjData);
}
const partsInfo = const partsInfo =
generateMpuPartStorageInfo(filteredPartsObj.partList); generateMpuPartStorageInfo(filteredPartsObj.partList);
if (partsInfo.error) { if (partsInfo.error) {
@ -254,28 +191,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
const { keysToDelete, extraPartLocations } = filteredPartsObj; const { keysToDelete, extraPartLocations } = filteredPartsObj;
const { aggregateETag, dataLocations, calculatedSize } = partsInfo; const { aggregateETag, dataLocations, calculatedSize } = partsInfo;
if (completeObjData) {
const dataLocations = [
{
key: completeObjData.key,
size: calculatedSize,
start: 0,
dataStoreName: storedMetadata.dataStoreName,
dataStoreETag: aggregateETag,
dataStoreType: completeObjData.dataStoreType,
},
];
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData);
}
return next(null, destBucket, objMD, mpuBucket, storedMetadata, return next(null, destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete, aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, null); extraPartLocations);
}, },
function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata, function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata,
aggregateETag, calculatedSize, dataLocations, keysToDelete, aggregateETag, calculatedSize, dataLocations, keysToDelete,
extraPartLocations, completeObjData, next) { extraPartLocations, next) {
const metaHeaders = {}; const metaHeaders = {};
const keysNotNeeded = const keysNotNeeded =
['initiator', 'partLocations', 'key', ['initiator', 'partLocations', 'key',
@ -339,13 +261,12 @@ function completeMultipartUpload(authInfo, request, log, callback) {
return next(null, destBucket, dataLocations, return next(null, destBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
objMD, extraPartLocations, pseudoCipherBundle, objMD, extraPartLocations, pseudoCipherBundle,
dataToDelete, completeObjData); dataToDelete);
}); });
}, },
function storeAsNewObj(destinationBucket, dataLocations, function storeAsNewObj(destinationBucket, dataLocations,
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD, metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD,
extraPartLocations, pseudoCipherBundle, dataToDelete, extraPartLocations, pseudoCipherBundle, dataToDelete, next) {
completeObjData, next) {
return services.metadataStoreObject(destinationBucket.getName(), return services.metadataStoreObject(destinationBucket.getName(),
dataLocations, pseudoCipherBundle, metaStoreParams, dataLocations, pseudoCipherBundle, metaStoreParams,
(err, res) => { (err, res) => {

View File

@ -419,7 +419,7 @@ const services = {
/** /**
* Checks whether bucket exists, multipart upload * Checks whether bucket exists, multipart upload
* has been initatied and the user is authorized * has been initiated and the user is authorized
* @param {object} params - custom built object containing * @param {object} params - custom built object containing
* bucket name, uploadId, authInfo etc. * bucket name, uploadId, authInfo etc.
* @param {function} cb - callback containing error and * @param {function} cb - callback containing error and

View File

@ -18,6 +18,7 @@ const { ds } = require('../../../lib/data/in_memory/backend');
const initiateMultipartUpload const initiateMultipartUpload
= require('../../../lib/api/initiateMultipartUpload'); = require('../../../lib/api/initiateMultipartUpload');
const { metadata } = require('../../../lib/metadata/in_memory/metadata'); const { metadata } = require('../../../lib/metadata/in_memory/metadata');
const metadataBackend = require('../../../lib/metadata/in_memory/backend');
const multipartDelete = require('../../../lib/api/multipartDelete'); const multipartDelete = require('../../../lib/api/multipartDelete');
const objectPutPart = require('../../../lib/api/objectPutPart'); const objectPutPart = require('../../../lib/api/objectPutPart');
const DummyRequest = require('../DummyRequest'); const DummyRequest = require('../DummyRequest');
@ -1695,28 +1696,9 @@ describe('complete mpu with versioning', () => {
versioningTestUtils.createPutObjectRequest(bucketName, objectKey, versioningTestUtils.createPutObjectRequest(bucketName, objectKey,
data)); data));
before(done => { beforeEach(done => {
cleanup(); cleanup();
async.series([ bucketPut(authInfo, bucketPutRequest, log, done);
callback => bucketPut(authInfo, bucketPutRequest, log,
callback),
// putting null version: put obj before versioning configured
callback => objectPut(authInfo, testPutObjectRequests[0],
undefined, log, callback),
callback => bucketPutVersioning(authInfo,
enableVersioningRequest, log, callback),
// put another version:
callback => objectPut(authInfo, testPutObjectRequests[1],
undefined, log, callback),
callback => bucketPutVersioning(authInfo,
suspendVersioningRequest, log, callback),
], err => {
if (err) {
return done(err);
}
versioningTestUtils.assertDataStoreValues(ds, objData.slice(0, 2));
return done();
});
}); });
after(done => { after(done => {
@ -1727,8 +1709,21 @@ describe('complete mpu with versioning', () => {
it('should delete null version when creating new null version, ' + it('should delete null version when creating new null version, ' +
'even when null version is not the latest version', done => { 'even when null version is not the latest version', done => {
async.waterfall([ async.waterfall([
next => // putting null version: put obj before versioning configured
initiateMultipartUpload(authInfo, initiateRequest, log, next), next => objectPut(authInfo, testPutObjectRequests[0],
undefined, log, err => next(err)),
next => bucketPutVersioning(authInfo,
enableVersioningRequest, log, err => next(err)),
// put another version:
next => objectPut(authInfo, testPutObjectRequests[1],
undefined, log, err => next(err)),
next => bucketPutVersioning(authInfo,
suspendVersioningRequest, log, err => next(err)),
next => {
versioningTestUtils.assertDataStoreValues(
ds, objData.slice(0, 2));
initiateMultipartUpload(authInfo, initiateRequest, log, next);
},
(result, corsHeaders, next) => parseString(result, next), (result, corsHeaders, next) => parseString(result, next),
(json, next) => { (json, next) => {
const partBody = objData[2]; const partBody = objData[2];
@ -1754,4 +1749,74 @@ describe('complete mpu with versioning', () => {
}); });
}); });
}); });
it('should finish deleting metadata on completeMultipartUpload retry',
done => {
let origDeleteObject;
async.waterfall([
next => bucketPutVersioning(authInfo,
enableVersioningRequest, log, err => next(err)),
next =>
initiateMultipartUpload(authInfo, initiateRequest, log, next),
(result, corsHeaders, next) => parseString(result, next),
(json, next) => {
const partBody = objData[2];
const testUploadId =
json.InitiateMultipartUploadResult.UploadId[0];
const partRequest = _createPutPartRequest(testUploadId, 1,
partBody);
objectPutPart(authInfo, partRequest, undefined, log,
(err, eTag) => next(err, eTag, testUploadId));
},
(eTag, testUploadId, next) => {
origDeleteObject = metadataBackend.deleteObject;
metadataBackend.deleteObject = (
bucketName, objName, params, log, cb) => {
// prevent deletions from MPU bucket only
if (bucketName === mpuBucket) {
return process.nextTick(
() => cb(errors.InternalError));
}
return origDeleteObject(
bucketName, objName, params, log, cb);
};
const parts = [{ partNumber: 1, eTag }];
const completeRequest = _createCompleteMpuRequest(
testUploadId, parts);
completeMultipartUpload(authInfo, completeRequest, log, err => {
// expect a failure here because we could not
// remove the overview key
assert.strictEqual(err, errors.InternalError);
next(null, eTag, testUploadId);
});
},
(eTag, testUploadId, next) => {
// allow MPU bucket metadata deletions to happen again
metadataBackend.deleteObject = origDeleteObject;
// retry the completeMultipartUpload with the same
// metadata, as an application would normally do after
// a failure
const parts = [{ partNumber: 1, eTag }];
const completeRequest = _createCompleteMpuRequest(
testUploadId, parts);
completeMultipartUpload(authInfo, completeRequest, log, next);
},
], err => {
assert.ifError(err);
let nbVersions = 0;
for (const key of metadata.keyMaps.get(bucketName).keys()) {
if (key !== objectKey && key.startsWith(objectKey)) {
nbVersions += 1;
}
}
// There should be only one version of the object, since
// the second call should not have created a new version
assert.strictEqual(nbVersions, 1);
for (const key of metadata.keyMaps.get(mpuBucket).keys()) {
assert.fail('There should be no more keys in MPU bucket, ' +
`found "${key}"`);
}
done();
});
});
}); });