Compare commits
3 Commits
developmen
...
bugfix/CLD
Author | SHA1 | Date |
---|---|---|
williamlardier | e215e4a068 | |
williamlardier | 5301589013 | |
williamlardier | 1b2e453e04 |
|
@ -197,6 +197,8 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
|||
/* eslint-disable camelcase */
|
||||
const dontSkipBackend = externalBackends;
|
||||
/* eslint-enable camelcase */
|
||||
let dataGetInfoArr;
|
||||
let needsToCleanStorage = false;
|
||||
|
||||
const requestLogger =
|
||||
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids());
|
||||
|
@ -250,7 +252,7 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
|||
const prefixedDataStoreETag = dataStoreETag
|
||||
? `1:${dataStoreETag}`
|
||||
: `1:${calculatedHash}`;
|
||||
const dataGetInfoArr = [{ key, size, start: 0, dataStoreName,
|
||||
dataGetInfoArr = [{ key, size, start: 0, dataStoreName,
|
||||
dataStoreType, dataStoreETag: prefixedDataStoreETag,
|
||||
dataStoreVersionId }];
|
||||
if (cipherBundle) {
|
||||
|
@ -294,11 +296,30 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
|||
if (options.extraMD) {
|
||||
Object.assign(metadataStoreParams, options.extraMD);
|
||||
}
|
||||
return _storeInMDandDeleteData(bucketName, infoArr,
|
||||
return _storeInMDandDeleteData(bucketName, dataGetInfoArr,
|
||||
cipherBundle, metadataStoreParams,
|
||||
options.dataToDelete, requestLogger, requestMethod, next);
|
||||
options.dataToDelete, requestLogger, requestMethod, (err, data) => {
|
||||
if (err) {
|
||||
needsToCleanStorage = true;
|
||||
}
|
||||
return next(err, data);
|
||||
});
|
||||
},
|
||||
], callback);
|
||||
], (err, result) => {
|
||||
if (needsToCleanStorage && dataGetInfoArr) {
|
||||
return data.batchDelete(dataGetInfoArr, requestMethod, null,
|
||||
requestLogger, _err => {
|
||||
if (_err) {
|
||||
log.warn('potential orphan in storage', {
|
||||
error: _err,
|
||||
objects: dataGetInfoArr,
|
||||
});
|
||||
}
|
||||
return callback(err, result);
|
||||
});
|
||||
}
|
||||
return callback(err, result);
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = createAndStoreObject;
|
||||
|
|
|
@ -364,7 +364,7 @@ function getObjMetadataAndDelete(authInfo, canonicalID, request,
|
|||
objMD, authInfo, canonicalID, null, request,
|
||||
deleteInfo.newDeleteMarker, null, overheadField, log,
|
||||
's3:ObjectRemoved:DeleteMarkerCreated', (err, result) =>
|
||||
callback(err, objMD, deleteInfo, result.versionId));
|
||||
callback(err, objMD, deleteInfo, result?.versionId));
|
||||
},
|
||||
], (err, objMD, deleteInfo, versionId) => {
|
||||
if (err === skipError) {
|
||||
|
@ -428,7 +428,15 @@ function getObjMetadataAndDelete(authInfo, canonicalID, request,
|
|||
}
|
||||
|
||||
return async.each(chunks, (chunk, done) => data.batchDelete(chunk, null, null,
|
||||
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids()), done),
|
||||
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids()), err => {
|
||||
if (err) {
|
||||
log.warn('potential orphan in storage', {
|
||||
error: err,
|
||||
objects: chunk,
|
||||
});
|
||||
}
|
||||
return done();
|
||||
}),
|
||||
err => {
|
||||
if (err) {
|
||||
log.error('error deleting objects from data backend', { error: err });
|
||||
|
|
|
@ -90,6 +90,8 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
partNumber: paddedPartNumber,
|
||||
uploadId,
|
||||
};
|
||||
let needsToCleanStorage = false;
|
||||
let partLocations;
|
||||
|
||||
return async.waterfall([
|
||||
function checkDestAuth(next) {
|
||||
|
@ -279,6 +281,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
}
|
||||
return next(error, destBucketMD);
|
||||
}
|
||||
partLocations = locations;
|
||||
return next(null, destBucketMD, locations, eTag,
|
||||
copyObjectSize, sourceVerId, serverSideEncryption,
|
||||
lastModified, splitter);
|
||||
|
@ -331,6 +334,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
if (err) {
|
||||
log.debug('error storing new metadata',
|
||||
{ error: err, method: 'storeNewPartMetadata' });
|
||||
needsToCleanStorage = true;
|
||||
return next(err);
|
||||
}
|
||||
return next(null, locations, oldLocations, destBucketMD, totalHash,
|
||||
|
@ -411,6 +415,20 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
{ error: err });
|
||||
monitoring.promMetrics('PUT', destBucketName, err.code,
|
||||
'putObjectCopyPart');
|
||||
if (needsToCleanStorage && partLocations) {
|
||||
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
return data.batchDelete(partLocations, request.method, null, delLog,
|
||||
_err => {
|
||||
if (_err) {
|
||||
log.warn('potential orphan in storage', {
|
||||
error: _err,
|
||||
objects: partLocations,
|
||||
});
|
||||
}
|
||||
return callback(err, null, corsHeaders);
|
||||
});
|
||||
}
|
||||
return callback(err, null, corsHeaders);
|
||||
}
|
||||
const xml = [
|
||||
|
|
|
@ -104,6 +104,9 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
const mpuBucketName = `${constants.mpuBucketPrefix}${bucketName}`;
|
||||
const { objectKey } = request;
|
||||
const originalIdentityAuthzResults = request.actionImplicitDenies;
|
||||
let needsToCleanStorage = false;
|
||||
let partLocations;
|
||||
let objectLocationConstraint;
|
||||
|
||||
return async.waterfall([
|
||||
// Get the destination bucket.
|
||||
|
@ -195,7 +198,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
return next(errors.AccessDenied, destinationBucket);
|
||||
}
|
||||
|
||||
const objectLocationConstraint =
|
||||
objectLocationConstraint =
|
||||
res.controllingLocationConstraint;
|
||||
return next(null, destinationBucket,
|
||||
objectLocationConstraint,
|
||||
|
@ -306,7 +309,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
prevObjectSize, oldLocations, objectLocationConstraint, splitter, next) => {
|
||||
// Use an array to be consistent with objectPutCopyPart where there
|
||||
// could be multiple locations.
|
||||
const partLocations = [dataGetInfo];
|
||||
partLocations = [dataGetInfo];
|
||||
if (cipherBundle) {
|
||||
const { algorithm, masterKeyId, cryptoScheme,
|
||||
cipheredDataKey } = cipherBundle;
|
||||
|
@ -333,6 +336,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
error: err,
|
||||
method: 'objectPutPart::metadata.putObjectMD',
|
||||
});
|
||||
needsToCleanStorage = true;
|
||||
return next(err, destinationBucket);
|
||||
}
|
||||
return next(null, partLocations, oldLocations, objectLocationConstraint,
|
||||
|
@ -413,6 +417,20 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
});
|
||||
monitoring.promMetrics('PUT', bucketName, err.code,
|
||||
'putObjectPart');
|
||||
if (needsToCleanStorage && partLocations) {
|
||||
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
return data.batchDelete(partLocations, request.method, objectLocationConstraint, delLog,
|
||||
_err => {
|
||||
if (_err) {
|
||||
log.warn('potential orphan in storage', {
|
||||
error: _err,
|
||||
objects: partLocations,
|
||||
});
|
||||
}
|
||||
return cb(err, null, corsHeaders);
|
||||
});
|
||||
}
|
||||
return cb(err, null, corsHeaders);
|
||||
}
|
||||
pushMetric('uploadPart', log, {
|
||||
|
|
|
@ -363,12 +363,24 @@ const services = {
|
|||
}
|
||||
|
||||
if (!Array.isArray(objectMD.location)) {
|
||||
data.delete(objectMD.location, deleteLog);
|
||||
return data.delete(objectMD.location, deleteLog, err => {
|
||||
if (err) {
|
||||
log.warn('potential orphan in storage', {
|
||||
object: objectMD.location,
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, res);
|
||||
});
|
||||
}
|
||||
|
||||
return data.batchDelete(objectMD.location, null, null, deleteLog, err => {
|
||||
if (err) {
|
||||
log.warn('potential orphan in storage', {
|
||||
objects: objectMD.location,
|
||||
error: err,
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, res);
|
||||
|
|
|
@ -100,6 +100,7 @@ const expectedRetentionConfig = {
|
|||
const expectedLegalHold = {
|
||||
Status: ['ON'],
|
||||
};
|
||||
const originalPutObjectMD = metadataswitch.putObjectMD;
|
||||
|
||||
function _createPutPartRequest(uploadId, partNumber, partBody) {
|
||||
const md5Hash = crypto.createHash('md5').update(partBody);
|
||||
|
@ -148,6 +149,10 @@ describe('Multipart Upload API', () => {
|
|||
cleanup();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
metadataswitch.putObjectMD = originalPutObjectMD;
|
||||
});
|
||||
|
||||
it('mpuBucketPrefix should be a defined constant', () => {
|
||||
assert(constants.mpuBucketPrefix,
|
||||
'Expected mpuBucketPrefix to be defined');
|
||||
|
@ -269,6 +274,50 @@ describe('Multipart Upload API', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('should not create orphans in storage when uplading a part with a failed metadata update', done => {
|
||||
async.waterfall([
|
||||
next => bucketPut(authInfo, bucketPutRequest, log, next),
|
||||
(corsHeaders, next) => initiateMultipartUpload(authInfo,
|
||||
initiateRequest, log, next),
|
||||
(result, corsHeaders, next) => {
|
||||
const mpuKeys = metadata.keyMaps.get(mpuBucket);
|
||||
assert.strictEqual(mpuKeys.size, 1);
|
||||
assert(mpuKeys.keys().next().value
|
||||
.startsWith(`overview${splitter}${objectKey}`));
|
||||
parseString(result, next);
|
||||
},
|
||||
],
|
||||
(err, json) => {
|
||||
// Need to build request in here since do not have uploadId
|
||||
// until here
|
||||
assert.ifError(err);
|
||||
const testUploadId = json.InitiateMultipartUploadResult.UploadId[0];
|
||||
const md5Hash = crypto.createHash('md5');
|
||||
const bufferBody = Buffer.from(postBody);
|
||||
md5Hash.update(bufferBody);
|
||||
const calculatedHash = md5Hash.digest('hex');
|
||||
const partRequest = new DummyRequest({
|
||||
bucketName,
|
||||
objectKey,
|
||||
namespace,
|
||||
url: `/${objectKey}?partNumber=1&uploadId=${testUploadId}`,
|
||||
headers: { host: `${bucketName}.s3.amazonaws.com` },
|
||||
query: {
|
||||
partNumber: '1',
|
||||
uploadId: testUploadId,
|
||||
},
|
||||
calculatedHash,
|
||||
}, postBody);
|
||||
sinon.stub(metadataswitch, 'putObjectMD').callsArgWith(5, errors.InternalError);
|
||||
objectPutPart(authInfo, partRequest, undefined, log, err => {
|
||||
assert(err.is.InternalError);
|
||||
assert.strictEqual(ds.filter(obj => obj.keyContext.objectKey === objectKey).length, 0);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
it('should upload a part even if the client sent a base 64 ETag ' +
|
||||
'(and the stored ETag in metadata should be hex)', done => {
|
||||
async.waterfall([
|
||||
|
|
|
@ -2,7 +2,7 @@ const assert = require('assert');
|
|||
const async = require('async');
|
||||
const sinon = require('sinon');
|
||||
const { parseString } = require('xml2js');
|
||||
const { storage } = require('arsenal');
|
||||
const { storage, errors } = require('arsenal');
|
||||
const { bucketPut } = require('../../../lib/api/bucketPut');
|
||||
const objectPut = require('../../../lib/api/objectPut');
|
||||
const objectPutCopyPart = require('../../../lib/api/objectPutCopyPart');
|
||||
|
@ -14,6 +14,7 @@ const DummyRequest = require('../DummyRequest');
|
|||
const { cleanup, DummyRequestLogger, makeAuthInfo, versioningTestUtils }
|
||||
= require('../helpers');
|
||||
|
||||
const { ds } = storage.data.inMemory.datastore;
|
||||
const log = new DummyRequestLogger();
|
||||
const canonicalID = 'accessKey1';
|
||||
const authInfo = makeAuthInfo(canonicalID);
|
||||
|
@ -137,4 +138,21 @@ describe('objectCopyPart', () => {
|
|||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should not create orphans in storage when copying a part with a failed metadata update', done => {
|
||||
const testObjectCopyRequest = _createObjectCopyPartRequest(destBucketName, uploadId);
|
||||
sinon.restore();
|
||||
sinon.stub(metadataswitch, 'putObjectMD').callsArgWith(5, errors.InternalError);
|
||||
const storedPartsBefore = ds.filter(obj => obj.keyContext.objectKey === objectKey
|
||||
&& obj.keyContext.uploadId === uploadId).length;
|
||||
|
||||
objectPutCopyPart(authInfo, testObjectCopyRequest, sourceBucketName, objectKey, undefined, log, err => {
|
||||
assert(err.is.InternalError);
|
||||
// ensure the number of stored parts is the same
|
||||
const storedPartsAfter = ds.filter(obj => obj.keyContext.objectKey === objectKey
|
||||
&& obj.keyContext.uploadId === uploadId).length;
|
||||
assert.strictEqual(storedPartsBefore, storedPartsAfter);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const moment = require('moment');
|
||||
const { s3middleware, storage } = require('arsenal');
|
||||
const { s3middleware, storage, errors } = require('arsenal');
|
||||
const sinon = require('sinon');
|
||||
|
||||
const { bucketPut } = require('../../../lib/api/bucketPut');
|
||||
|
@ -18,6 +18,7 @@ const DummyRequest = require('../DummyRequest');
|
|||
const { maximumAllowedUploadSize } = require('../../../constants');
|
||||
const mpuUtils = require('../utils/mpuUtils');
|
||||
const { lastModifiedHeader } = require('../../../constants');
|
||||
const services = require('../../../lib/services');
|
||||
|
||||
const { ds } = storage.data.inMemory.datastore;
|
||||
|
||||
|
@ -48,9 +49,12 @@ const testPutBucketRequestLock = new DummyRequest({
|
|||
});
|
||||
|
||||
const originalputObjectMD = metadata.putObjectMD;
|
||||
const originalMetadataStoreObject = services.metadataStoreObject;
|
||||
const objectName = 'objectName';
|
||||
const objectNameFailure = 'objectNameFailure';
|
||||
|
||||
let testPutObjectRequest;
|
||||
let testPutObjectRequestFailure;
|
||||
const enableVersioningRequest =
|
||||
versioningTestUtils.createBucketPutVersioningReq(bucketName, 'Enabled');
|
||||
const suspendVersioningRequest =
|
||||
|
@ -112,11 +116,19 @@ describe('objectPut API', () => {
|
|||
headers: { host: `${bucketName}.s3.amazonaws.com` },
|
||||
url: '/',
|
||||
}, postBody);
|
||||
testPutObjectRequestFailure = new DummyRequest({
|
||||
bucketName,
|
||||
namespace,
|
||||
objectKey: objectNameFailure,
|
||||
headers: { host: `${bucketName}.s3.amazonaws.com` },
|
||||
url: '/',
|
||||
}, postBody);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
sinon.restore();
|
||||
metadata.putObjectMD = originalputObjectMD;
|
||||
services.metadataStoreObject = originalMetadataStoreObject;
|
||||
});
|
||||
|
||||
it('should return an error if the bucket does not exist', done => {
|
||||
|
@ -529,6 +541,18 @@ describe('objectPut API', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('should not leave orphans in data when the metadata storage step fails', done => {
|
||||
sinon.stub(services, 'metadataStoreObject').callsArgWith(4, errors.InternalError);
|
||||
|
||||
bucketPut(authInfo, testPutBucketRequest, log, () => {
|
||||
objectPut(authInfo, testPutObjectRequestFailure, undefined, log, err => {
|
||||
assert(err.is.InternalError);
|
||||
assert.strictEqual(ds.filter(obj => obj.keyContext.objectKey === objectNameFailure).length, 0);
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should not leave orphans in data when overwriting an multipart upload object', done => {
|
||||
bucketPut(authInfo, testPutBucketRequest, log, () => {
|
||||
mpuUtils.createMPU(namespace, bucketName, objectName, log,
|
||||
|
|
Loading…
Reference in New Issue