Compare commits
3 Commits
developmen
...
user/jonat
Author | SHA1 | Date |
---|---|---|
Jonathan Gramain | 9ec289c561 | |
Jonathan Gramain | 4286d0e50a | |
Jonathan Gramain | 19ed76cb40 |
|
@ -1,16 +1,13 @@
|
|||
const async = require('async');
|
||||
|
||||
const { config } = require('../../../Config');
|
||||
const constants = require('../../../../constants');
|
||||
const data = require('../../../data/wrapper');
|
||||
const locationConstraintCheck = require('../object/locationConstraintCheck');
|
||||
const { metadataValidateBucketAndObj } =
|
||||
require('../../../metadata/metadataUtils');
|
||||
const multipleBackendGateway = require('../../../data/multipleBackendGateway');
|
||||
const services = require('../../../services');
|
||||
|
||||
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
|
||||
callback, request) {
|
||||
callback) {
|
||||
const metadataValMPUparams = {
|
||||
authInfo,
|
||||
bucketName,
|
||||
|
@ -47,61 +44,24 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
|
|||
function checkMPUval(destBucket, next) {
|
||||
metadataValParams.log = log;
|
||||
services.metadataValidateMultipart(metadataValParams,
|
||||
(err, mpuBucket, mpuOverviewObj) => {
|
||||
(err, mpuBucket) => {
|
||||
if (err) {
|
||||
return next(err, destBucket);
|
||||
}
|
||||
return next(err, mpuBucket, mpuOverviewObj, destBucket);
|
||||
return next(null, mpuBucket, destBucket);
|
||||
});
|
||||
},
|
||||
function ifMultipleBackend(mpuBucket, mpuOverviewObj, destBucket,
|
||||
next) {
|
||||
if (config.backends.data === 'multiple') {
|
||||
let location;
|
||||
// if controlling location constraint is not stored in object
|
||||
// metadata, mpu was initiated in legacy S3C, so need to
|
||||
// determine correct location constraint
|
||||
if (!mpuOverviewObj.controllingLocationConstraint) {
|
||||
const backendInfoObj = locationConstraintCheck(request,
|
||||
null, destBucket, log);
|
||||
if (backendInfoObj.err) {
|
||||
return process.nextTick(() => {
|
||||
next(backendInfoObj.err, destBucket);
|
||||
});
|
||||
}
|
||||
location = backendInfoObj.controllingLC;
|
||||
} else {
|
||||
location = mpuOverviewObj.controllingLocationConstraint;
|
||||
}
|
||||
return multipleBackendGateway.abortMPU(objectKey, uploadId,
|
||||
location, bucketName, log, (err, skipDataDelete) => {
|
||||
if (err) {
|
||||
return next(err, destBucket);
|
||||
}
|
||||
return next(null, mpuBucket, destBucket,
|
||||
skipDataDelete);
|
||||
});
|
||||
}
|
||||
return next(null, mpuBucket, destBucket, false);
|
||||
},
|
||||
function getPartLocations(mpuBucket, destBucket, skipDataDelete,
|
||||
next) {
|
||||
function getPartLocations(mpuBucket, destBucket, next) {
|
||||
services.getMPUparts(mpuBucket.getName(), uploadId, log,
|
||||
(err, result) => {
|
||||
if (err) {
|
||||
return next(err, destBucket);
|
||||
}
|
||||
const storedParts = result.Contents;
|
||||
return next(null, mpuBucket, storedParts, destBucket,
|
||||
skipDataDelete);
|
||||
return next(null, mpuBucket, storedParts, destBucket);
|
||||
});
|
||||
},
|
||||
function deleteData(mpuBucket, storedParts, destBucket,
|
||||
skipDataDelete, next) {
|
||||
// for Azure we do not need to delete data
|
||||
if (skipDataDelete) {
|
||||
return next(null, mpuBucket, storedParts, destBucket);
|
||||
}
|
||||
function deleteData(mpuBucket, storedParts, destBucket, next) {
|
||||
// The locations were sent to metadata as an array
|
||||
// under partLocations. Pull the partLocations.
|
||||
let locations = storedParts.map(item => item.value.partLocations);
|
||||
|
|
|
@ -7,8 +7,6 @@ const { pushMetric } = require('../utapi/utilities');
|
|||
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
|
||||
const { validateAndFilterMpuParts, generateMpuPartStorageInfo } =
|
||||
require('./apiUtils/object/processMpuParts');
|
||||
const { config } = require('../Config');
|
||||
const multipleBackendGateway = require('../data/multipleBackendGateway');
|
||||
|
||||
const data = require('../data/wrapper');
|
||||
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
|
||||
|
@ -18,9 +16,6 @@ const { versioningPreprocessing, checkQueryVersionId }
|
|||
const metadata = require('../metadata/wrapper');
|
||||
const services = require('../services');
|
||||
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
|
||||
const { skipMpuPartProcessing } = require('../data/external/utils');
|
||||
const locationConstraintCheck
|
||||
= require('./apiUtils/object/locationConstraintCheck');
|
||||
const locationKeysSanityCheck
|
||||
= require('./apiUtils/object/locationKeysSanityCheck');
|
||||
|
||||
|
@ -159,13 +154,12 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
|||
if (err) {
|
||||
return next(err, destBucket);
|
||||
}
|
||||
const location = storedMetadata.controllingLocationConstraint;
|
||||
return next(null, destBucket, objMD, mpuBucket, jsonList,
|
||||
storedMetadata, location, mpuOverviewKey);
|
||||
storedMetadata, mpuOverviewKey);
|
||||
});
|
||||
},
|
||||
function retrieveParts(destBucket, objMD, mpuBucket, jsonList,
|
||||
storedMetadata, location, mpuOverviewKey, next) {
|
||||
storedMetadata, mpuOverviewKey, next) {
|
||||
return services.getMPUparts(mpuBucket.getName(), uploadId, log,
|
||||
(err, result) => {
|
||||
if (err) {
|
||||
|
@ -173,79 +167,22 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
|||
}
|
||||
const storedParts = result.Contents;
|
||||
return next(null, destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, location, mpuOverviewKey);
|
||||
jsonList, storedMetadata, mpuOverviewKey);
|
||||
});
|
||||
},
|
||||
function ifMultipleBackend(destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, location, mpuOverviewKey, next) {
|
||||
if (config.backends.data === 'multiple') {
|
||||
// if mpu was initiated in legacy version
|
||||
if (location === undefined) {
|
||||
const backendInfoObj = locationConstraintCheck(request,
|
||||
null, destBucket, log);
|
||||
if (backendInfoObj.err) {
|
||||
return process.nextTick(() => {
|
||||
next(backendInfoObj.err);
|
||||
});
|
||||
}
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
location = backendInfoObj.controllingLC;
|
||||
}
|
||||
const mdInfo = { storedParts, mpuOverviewKey, splitter };
|
||||
return multipleBackendGateway.completeMPU(objectKey,
|
||||
uploadId, location, jsonList, mdInfo, bucketName, null, null,
|
||||
log, (err, completeObjData) => {
|
||||
if (err) {
|
||||
return next(err, destBucket);
|
||||
}
|
||||
return next(null, destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, completeObjData,
|
||||
mpuOverviewKey);
|
||||
});
|
||||
}
|
||||
return next(null, destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, null, mpuOverviewKey);
|
||||
},
|
||||
function validateAndFilterParts(destBucket, objMD, mpuBucket,
|
||||
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey,
|
||||
storedParts, jsonList, storedMetadata, mpuOverviewKey,
|
||||
next) {
|
||||
if (completeObjData) {
|
||||
return next(null, destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
|
||||
completeObjData.filteredPartsObj);
|
||||
}
|
||||
const filteredPartsObj = validateAndFilterMpuParts(storedParts,
|
||||
jsonList, mpuOverviewKey, splitter, log);
|
||||
if (filteredPartsObj.error) {
|
||||
return next(filteredPartsObj.error, destBucket);
|
||||
}
|
||||
return next(null, destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
|
||||
filteredPartsObj);
|
||||
jsonList, storedMetadata, mpuOverviewKey, filteredPartsObj);
|
||||
},
|
||||
function processParts(destBucket, objMD, mpuBucket, storedParts,
|
||||
jsonList, storedMetadata, completeObjData, mpuOverviewKey,
|
||||
filteredPartsObj, next) {
|
||||
// if mpu was completed on backend that stored mpu MD externally,
|
||||
// skip MD processing steps
|
||||
if (completeObjData && skipMpuPartProcessing(completeObjData)) {
|
||||
const dataLocations = [
|
||||
{
|
||||
key: completeObjData.key,
|
||||
size: completeObjData.contentLength,
|
||||
start: 0,
|
||||
dataStoreVersionId: completeObjData.dataStoreVersionId,
|
||||
dataStoreName: storedMetadata.dataStoreName,
|
||||
dataStoreETag: completeObjData.eTag,
|
||||
dataStoreType: completeObjData.dataStoreType,
|
||||
},
|
||||
];
|
||||
const calculatedSize = completeObjData.contentLength;
|
||||
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
|
||||
completeObjData.eTag, calculatedSize, dataLocations,
|
||||
[mpuOverviewKey], null, completeObjData);
|
||||
}
|
||||
|
||||
jsonList, storedMetadata, mpuOverviewKey, filteredPartsObj, next) {
|
||||
const partsInfo =
|
||||
generateMpuPartStorageInfo(filteredPartsObj.partList);
|
||||
if (partsInfo.error) {
|
||||
|
@ -254,28 +191,13 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
|||
const { keysToDelete, extraPartLocations } = filteredPartsObj;
|
||||
const { aggregateETag, dataLocations, calculatedSize } = partsInfo;
|
||||
|
||||
if (completeObjData) {
|
||||
const dataLocations = [
|
||||
{
|
||||
key: completeObjData.key,
|
||||
size: calculatedSize,
|
||||
start: 0,
|
||||
dataStoreName: storedMetadata.dataStoreName,
|
||||
dataStoreETag: aggregateETag,
|
||||
dataStoreType: completeObjData.dataStoreType,
|
||||
},
|
||||
];
|
||||
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
|
||||
aggregateETag, calculatedSize, dataLocations, keysToDelete,
|
||||
extraPartLocations, completeObjData);
|
||||
}
|
||||
return next(null, destBucket, objMD, mpuBucket, storedMetadata,
|
||||
aggregateETag, calculatedSize, dataLocations, keysToDelete,
|
||||
extraPartLocations, null);
|
||||
extraPartLocations);
|
||||
},
|
||||
function prepForStoring(destBucket, objMD, mpuBucket, storedMetadata,
|
||||
aggregateETag, calculatedSize, dataLocations, keysToDelete,
|
||||
extraPartLocations, completeObjData, next) {
|
||||
extraPartLocations, next) {
|
||||
const metaHeaders = {};
|
||||
const keysNotNeeded =
|
||||
['initiator', 'partLocations', 'key',
|
||||
|
@ -339,13 +261,12 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
|||
return next(null, destBucket, dataLocations,
|
||||
metaStoreParams, mpuBucket, keysToDelete, aggregateETag,
|
||||
objMD, extraPartLocations, pseudoCipherBundle,
|
||||
dataToDelete, completeObjData);
|
||||
dataToDelete);
|
||||
});
|
||||
},
|
||||
function storeAsNewObj(destinationBucket, dataLocations,
|
||||
metaStoreParams, mpuBucket, keysToDelete, aggregateETag, objMD,
|
||||
extraPartLocations, pseudoCipherBundle, dataToDelete,
|
||||
completeObjData, next) {
|
||||
extraPartLocations, pseudoCipherBundle, dataToDelete, next) {
|
||||
return services.metadataStoreObject(destinationBucket.getName(),
|
||||
dataLocations, pseudoCipherBundle, metaStoreParams,
|
||||
(err, res) => {
|
||||
|
|
|
@ -419,7 +419,7 @@ const services = {
|
|||
|
||||
/**
|
||||
* Checks whether bucket exists, multipart upload
|
||||
* has been initatied and the user is authorized
|
||||
* has been initiated and the user is authorized
|
||||
* @param {object} params - custom built object containing
|
||||
* bucket name, uploadId, authInfo etc.
|
||||
* @param {function} cb - callback containing error and
|
||||
|
|
|
@ -18,6 +18,7 @@ const { ds } = require('../../../lib/data/in_memory/backend');
|
|||
const initiateMultipartUpload
|
||||
= require('../../../lib/api/initiateMultipartUpload');
|
||||
const { metadata } = require('../../../lib/metadata/in_memory/metadata');
|
||||
const metadataBackend = require('../../../lib/metadata/in_memory/backend');
|
||||
const multipartDelete = require('../../../lib/api/multipartDelete');
|
||||
const objectPutPart = require('../../../lib/api/objectPutPart');
|
||||
const DummyRequest = require('../DummyRequest');
|
||||
|
@ -1695,28 +1696,9 @@ describe('complete mpu with versioning', () => {
|
|||
versioningTestUtils.createPutObjectRequest(bucketName, objectKey,
|
||||
data));
|
||||
|
||||
before(done => {
|
||||
beforeEach(done => {
|
||||
cleanup();
|
||||
async.series([
|
||||
callback => bucketPut(authInfo, bucketPutRequest, log,
|
||||
callback),
|
||||
// putting null version: put obj before versioning configured
|
||||
callback => objectPut(authInfo, testPutObjectRequests[0],
|
||||
undefined, log, callback),
|
||||
callback => bucketPutVersioning(authInfo,
|
||||
enableVersioningRequest, log, callback),
|
||||
// put another version:
|
||||
callback => objectPut(authInfo, testPutObjectRequests[1],
|
||||
undefined, log, callback),
|
||||
callback => bucketPutVersioning(authInfo,
|
||||
suspendVersioningRequest, log, callback),
|
||||
], err => {
|
||||
if (err) {
|
||||
return done(err);
|
||||
}
|
||||
versioningTestUtils.assertDataStoreValues(ds, objData.slice(0, 2));
|
||||
return done();
|
||||
});
|
||||
bucketPut(authInfo, bucketPutRequest, log, done);
|
||||
});
|
||||
|
||||
after(done => {
|
||||
|
@ -1727,8 +1709,21 @@ describe('complete mpu with versioning', () => {
|
|||
it('should delete null version when creating new null version, ' +
|
||||
'even when null version is not the latest version', done => {
|
||||
async.waterfall([
|
||||
next =>
|
||||
initiateMultipartUpload(authInfo, initiateRequest, log, next),
|
||||
// putting null version: put obj before versioning configured
|
||||
next => objectPut(authInfo, testPutObjectRequests[0],
|
||||
undefined, log, err => next(err)),
|
||||
next => bucketPutVersioning(authInfo,
|
||||
enableVersioningRequest, log, err => next(err)),
|
||||
// put another version:
|
||||
next => objectPut(authInfo, testPutObjectRequests[1],
|
||||
undefined, log, err => next(err)),
|
||||
next => bucketPutVersioning(authInfo,
|
||||
suspendVersioningRequest, log, err => next(err)),
|
||||
next => {
|
||||
versioningTestUtils.assertDataStoreValues(
|
||||
ds, objData.slice(0, 2));
|
||||
initiateMultipartUpload(authInfo, initiateRequest, log, next);
|
||||
},
|
||||
(result, corsHeaders, next) => parseString(result, next),
|
||||
(json, next) => {
|
||||
const partBody = objData[2];
|
||||
|
@ -1754,4 +1749,74 @@ describe('complete mpu with versioning', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should finish deleting metadata on completeMultipartUpload retry',
|
||||
done => {
|
||||
let origDeleteObject;
|
||||
async.waterfall([
|
||||
next => bucketPutVersioning(authInfo,
|
||||
enableVersioningRequest, log, err => next(err)),
|
||||
next =>
|
||||
initiateMultipartUpload(authInfo, initiateRequest, log, next),
|
||||
(result, corsHeaders, next) => parseString(result, next),
|
||||
(json, next) => {
|
||||
const partBody = objData[2];
|
||||
const testUploadId =
|
||||
json.InitiateMultipartUploadResult.UploadId[0];
|
||||
const partRequest = _createPutPartRequest(testUploadId, 1,
|
||||
partBody);
|
||||
objectPutPart(authInfo, partRequest, undefined, log,
|
||||
(err, eTag) => next(err, eTag, testUploadId));
|
||||
},
|
||||
(eTag, testUploadId, next) => {
|
||||
origDeleteObject = metadataBackend.deleteObject;
|
||||
metadataBackend.deleteObject = (
|
||||
bucketName, objName, params, log, cb) => {
|
||||
// prevent deletions from MPU bucket only
|
||||
if (bucketName === mpuBucket) {
|
||||
return process.nextTick(
|
||||
() => cb(errors.InternalError));
|
||||
}
|
||||
return origDeleteObject(
|
||||
bucketName, objName, params, log, cb);
|
||||
};
|
||||
const parts = [{ partNumber: 1, eTag }];
|
||||
const completeRequest = _createCompleteMpuRequest(
|
||||
testUploadId, parts);
|
||||
completeMultipartUpload(authInfo, completeRequest, log, err => {
|
||||
// expect a failure here because we could not
|
||||
// remove the overview key
|
||||
assert.strictEqual(err, errors.InternalError);
|
||||
next(null, eTag, testUploadId);
|
||||
});
|
||||
},
|
||||
(eTag, testUploadId, next) => {
|
||||
// allow MPU bucket metadata deletions to happen again
|
||||
metadataBackend.deleteObject = origDeleteObject;
|
||||
// retry the completeMultipartUpload with the same
|
||||
// metadata, as an application would normally do after
|
||||
// a failure
|
||||
const parts = [{ partNumber: 1, eTag }];
|
||||
const completeRequest = _createCompleteMpuRequest(
|
||||
testUploadId, parts);
|
||||
completeMultipartUpload(authInfo, completeRequest, log, next);
|
||||
},
|
||||
], err => {
|
||||
assert.ifError(err);
|
||||
let nbVersions = 0;
|
||||
for (const key of metadata.keyMaps.get(bucketName).keys()) {
|
||||
if (key !== objectKey && key.startsWith(objectKey)) {
|
||||
nbVersions += 1;
|
||||
}
|
||||
}
|
||||
// There should be only one version of the object, since
|
||||
// the second call should not have created a new version
|
||||
assert.strictEqual(nbVersions, 1);
|
||||
for (const key of metadata.keyMaps.get(mpuBucket).keys()) {
|
||||
assert.fail('There should be no more keys in MPU bucket, ' +
|
||||
`found "${key}"`);
|
||||
}
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
Loading…
Reference in New Issue