Compare commits

...

3 Commits

Author SHA1 Message Date
williamlardier e215e4a068 CLDSRV-514: add logs when potential oprhans are created
Some APIs will delete the metadata before the storage side:
in this case, we log a specific warning with the associated
information, as a first way to keep track of such objects.
Future work will persist this information , to be processed
by some background service.
2024-03-06 15:04:55 +01:00
williamlardier 5301589013 CLDSRV-514: test orphan deletion 2024-03-06 15:04:55 +01:00
williamlardier 1b2e453e04 CLDSRV-514: clean orphans in storage if metadata stepo fails
Some APIs will do the following operation, sequentially:
- Store data in the storage service
- Store the associated metadata in the DB
- If an error occurs when dealing with the DB, return the
error to the client.

In such a scenario, the data is still present on the data disks,
and is never deleted.
The change ensures that in case of an error, we properly clean the
orphans.
2024-03-06 15:04:55 +01:00
8 changed files with 180 additions and 12 deletions

View File

@ -197,6 +197,8 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
/* eslint-disable camelcase */
const dontSkipBackend = externalBackends;
/* eslint-enable camelcase */
let dataGetInfoArr;
let needsToCleanStorage = false;
const requestLogger =
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids());
@ -250,7 +252,7 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
const prefixedDataStoreETag = dataStoreETag
? `1:${dataStoreETag}`
: `1:${calculatedHash}`;
const dataGetInfoArr = [{ key, size, start: 0, dataStoreName,
dataGetInfoArr = [{ key, size, start: 0, dataStoreName,
dataStoreType, dataStoreETag: prefixedDataStoreETag,
dataStoreVersionId }];
if (cipherBundle) {
@ -294,11 +296,30 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
if (options.extraMD) {
Object.assign(metadataStoreParams, options.extraMD);
}
return _storeInMDandDeleteData(bucketName, infoArr,
return _storeInMDandDeleteData(bucketName, dataGetInfoArr,
cipherBundle, metadataStoreParams,
options.dataToDelete, requestLogger, requestMethod, next);
options.dataToDelete, requestLogger, requestMethod, (err, data) => {
if (err) {
needsToCleanStorage = true;
}
return next(err, data);
});
},
], callback);
], (err, result) => {
if (needsToCleanStorage && dataGetInfoArr) {
return data.batchDelete(dataGetInfoArr, requestMethod, null,
requestLogger, _err => {
if (_err) {
log.warn('potential orphan in storage', {
error: _err,
objects: dataGetInfoArr,
});
}
return callback(err, result);
});
}
return callback(err, result);
});
}
module.exports = createAndStoreObject;

View File

@ -364,7 +364,7 @@ function getObjMetadataAndDelete(authInfo, canonicalID, request,
objMD, authInfo, canonicalID, null, request,
deleteInfo.newDeleteMarker, null, overheadField, log,
's3:ObjectRemoved:DeleteMarkerCreated', (err, result) =>
callback(err, objMD, deleteInfo, result.versionId));
callback(err, objMD, deleteInfo, result?.versionId));
},
], (err, objMD, deleteInfo, versionId) => {
if (err === skipError) {
@ -428,7 +428,15 @@ function getObjMetadataAndDelete(authInfo, canonicalID, request,
}
return async.each(chunks, (chunk, done) => data.batchDelete(chunk, null, null,
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids()), done),
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids()), err => {
if (err) {
log.warn('potential orphan in storage', {
error: err,
objects: chunk,
});
}
return done();
}),
err => {
if (err) {
log.error('error deleting objects from data backend', { error: err });

View File

@ -90,6 +90,8 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
partNumber: paddedPartNumber,
uploadId,
};
let needsToCleanStorage = false;
let partLocations;
return async.waterfall([
function checkDestAuth(next) {
@ -279,6 +281,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
}
return next(error, destBucketMD);
}
partLocations = locations;
return next(null, destBucketMD, locations, eTag,
copyObjectSize, sourceVerId, serverSideEncryption,
lastModified, splitter);
@ -331,6 +334,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
if (err) {
log.debug('error storing new metadata',
{ error: err, method: 'storeNewPartMetadata' });
needsToCleanStorage = true;
return next(err);
}
return next(null, locations, oldLocations, destBucketMD, totalHash,
@ -411,6 +415,20 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
{ error: err });
monitoring.promMetrics('PUT', destBucketName, err.code,
'putObjectCopyPart');
if (needsToCleanStorage && partLocations) {
const delLog = logger.newRequestLoggerFromSerializedUids(
log.getSerializedUids());
return data.batchDelete(partLocations, request.method, null, delLog,
_err => {
if (_err) {
log.warn('potential orphan in storage', {
error: _err,
objects: partLocations,
});
}
return callback(err, null, corsHeaders);
});
}
return callback(err, null, corsHeaders);
}
const xml = [

View File

@ -104,6 +104,9 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
const mpuBucketName = `${constants.mpuBucketPrefix}${bucketName}`;
const { objectKey } = request;
const originalIdentityAuthzResults = request.actionImplicitDenies;
let needsToCleanStorage = false;
let partLocations;
let objectLocationConstraint;
return async.waterfall([
// Get the destination bucket.
@ -195,7 +198,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
return next(errors.AccessDenied, destinationBucket);
}
const objectLocationConstraint =
objectLocationConstraint =
res.controllingLocationConstraint;
return next(null, destinationBucket,
objectLocationConstraint,
@ -306,7 +309,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
prevObjectSize, oldLocations, objectLocationConstraint, splitter, next) => {
// Use an array to be consistent with objectPutCopyPart where there
// could be multiple locations.
const partLocations = [dataGetInfo];
partLocations = [dataGetInfo];
if (cipherBundle) {
const { algorithm, masterKeyId, cryptoScheme,
cipheredDataKey } = cipherBundle;
@ -333,6 +336,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
error: err,
method: 'objectPutPart::metadata.putObjectMD',
});
needsToCleanStorage = true;
return next(err, destinationBucket);
}
return next(null, partLocations, oldLocations, objectLocationConstraint,
@ -413,6 +417,20 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
});
monitoring.promMetrics('PUT', bucketName, err.code,
'putObjectPart');
if (needsToCleanStorage && partLocations) {
const delLog = logger.newRequestLoggerFromSerializedUids(
log.getSerializedUids());
return data.batchDelete(partLocations, request.method, objectLocationConstraint, delLog,
_err => {
if (_err) {
log.warn('potential orphan in storage', {
error: _err,
objects: partLocations,
});
}
return cb(err, null, corsHeaders);
});
}
return cb(err, null, corsHeaders);
}
pushMetric('uploadPart', log, {

View File

@ -363,12 +363,24 @@ const services = {
}
if (!Array.isArray(objectMD.location)) {
data.delete(objectMD.location, deleteLog);
return cb(null, res);
return data.delete(objectMD.location, deleteLog, err => {
if (err) {
log.warn('potential orphan in storage', {
object: objectMD.location,
error: err,
});
return cb(err);
}
return cb(null, res);
});
}
return data.batchDelete(objectMD.location, null, null, deleteLog, err => {
if (err) {
log.warn('potential orphan in storage', {
objects: objectMD.location,
error: err,
});
return cb(err);
}
return cb(null, res);

View File

@ -100,6 +100,7 @@ const expectedRetentionConfig = {
const expectedLegalHold = {
Status: ['ON'],
};
const originalPutObjectMD = metadataswitch.putObjectMD;
function _createPutPartRequest(uploadId, partNumber, partBody) {
const md5Hash = crypto.createHash('md5').update(partBody);
@ -148,6 +149,10 @@ describe('Multipart Upload API', () => {
cleanup();
});
afterEach(() => {
metadataswitch.putObjectMD = originalPutObjectMD;
});
it('mpuBucketPrefix should be a defined constant', () => {
assert(constants.mpuBucketPrefix,
'Expected mpuBucketPrefix to be defined');
@ -269,6 +274,50 @@ describe('Multipart Upload API', () => {
});
});
it('should not create orphans in storage when uplading a part with a failed metadata update', done => {
async.waterfall([
next => bucketPut(authInfo, bucketPutRequest, log, next),
(corsHeaders, next) => initiateMultipartUpload(authInfo,
initiateRequest, log, next),
(result, corsHeaders, next) => {
const mpuKeys = metadata.keyMaps.get(mpuBucket);
assert.strictEqual(mpuKeys.size, 1);
assert(mpuKeys.keys().next().value
.startsWith(`overview${splitter}${objectKey}`));
parseString(result, next);
},
],
(err, json) => {
// Need to build request in here since do not have uploadId
// until here
assert.ifError(err);
const testUploadId = json.InitiateMultipartUploadResult.UploadId[0];
const md5Hash = crypto.createHash('md5');
const bufferBody = Buffer.from(postBody);
md5Hash.update(bufferBody);
const calculatedHash = md5Hash.digest('hex');
const partRequest = new DummyRequest({
bucketName,
objectKey,
namespace,
url: `/${objectKey}?partNumber=1&uploadId=${testUploadId}`,
headers: { host: `${bucketName}.s3.amazonaws.com` },
query: {
partNumber: '1',
uploadId: testUploadId,
},
calculatedHash,
}, postBody);
sinon.stub(metadataswitch, 'putObjectMD').callsArgWith(5, errors.InternalError);
objectPutPart(authInfo, partRequest, undefined, log, err => {
assert(err.is.InternalError);
assert.strictEqual(ds.filter(obj => obj.keyContext.objectKey === objectKey).length, 0);
done();
});
});
});
it('should upload a part even if the client sent a base 64 ETag ' +
'(and the stored ETag in metadata should be hex)', done => {
async.waterfall([

View File

@ -2,7 +2,7 @@ const assert = require('assert');
const async = require('async');
const sinon = require('sinon');
const { parseString } = require('xml2js');
const { storage } = require('arsenal');
const { storage, errors } = require('arsenal');
const { bucketPut } = require('../../../lib/api/bucketPut');
const objectPut = require('../../../lib/api/objectPut');
const objectPutCopyPart = require('../../../lib/api/objectPutCopyPart');
@ -14,6 +14,7 @@ const DummyRequest = require('../DummyRequest');
const { cleanup, DummyRequestLogger, makeAuthInfo, versioningTestUtils }
= require('../helpers');
const { ds } = storage.data.inMemory.datastore;
const log = new DummyRequestLogger();
const canonicalID = 'accessKey1';
const authInfo = makeAuthInfo(canonicalID);
@ -137,4 +138,21 @@ describe('objectCopyPart', () => {
done();
});
});
it('should not create orphans in storage when copying a part with a failed metadata update', done => {
const testObjectCopyRequest = _createObjectCopyPartRequest(destBucketName, uploadId);
sinon.restore();
sinon.stub(metadataswitch, 'putObjectMD').callsArgWith(5, errors.InternalError);
const storedPartsBefore = ds.filter(obj => obj.keyContext.objectKey === objectKey
&& obj.keyContext.uploadId === uploadId).length;
objectPutCopyPart(authInfo, testObjectCopyRequest, sourceBucketName, objectKey, undefined, log, err => {
assert(err.is.InternalError);
// ensure the number of stored parts is the same
const storedPartsAfter = ds.filter(obj => obj.keyContext.objectKey === objectKey
&& obj.keyContext.uploadId === uploadId).length;
assert.strictEqual(storedPartsBefore, storedPartsAfter);
done();
});
});
});

View File

@ -1,7 +1,7 @@
const assert = require('assert');
const async = require('async');
const moment = require('moment');
const { s3middleware, storage } = require('arsenal');
const { s3middleware, storage, errors } = require('arsenal');
const sinon = require('sinon');
const { bucketPut } = require('../../../lib/api/bucketPut');
@ -18,6 +18,7 @@ const DummyRequest = require('../DummyRequest');
const { maximumAllowedUploadSize } = require('../../../constants');
const mpuUtils = require('../utils/mpuUtils');
const { lastModifiedHeader } = require('../../../constants');
const services = require('../../../lib/services');
const { ds } = storage.data.inMemory.datastore;
@ -48,9 +49,12 @@ const testPutBucketRequestLock = new DummyRequest({
});
const originalputObjectMD = metadata.putObjectMD;
const originalMetadataStoreObject = services.metadataStoreObject;
const objectName = 'objectName';
const objectNameFailure = 'objectNameFailure';
let testPutObjectRequest;
let testPutObjectRequestFailure;
const enableVersioningRequest =
versioningTestUtils.createBucketPutVersioningReq(bucketName, 'Enabled');
const suspendVersioningRequest =
@ -112,11 +116,19 @@ describe('objectPut API', () => {
headers: { host: `${bucketName}.s3.amazonaws.com` },
url: '/',
}, postBody);
testPutObjectRequestFailure = new DummyRequest({
bucketName,
namespace,
objectKey: objectNameFailure,
headers: { host: `${bucketName}.s3.amazonaws.com` },
url: '/',
}, postBody);
});
afterEach(() => {
sinon.restore();
metadata.putObjectMD = originalputObjectMD;
services.metadataStoreObject = originalMetadataStoreObject;
});
it('should return an error if the bucket does not exist', done => {
@ -529,6 +541,18 @@ describe('objectPut API', () => {
});
});
it('should not leave orphans in data when the metadata storage step fails', done => {
sinon.stub(services, 'metadataStoreObject').callsArgWith(4, errors.InternalError);
bucketPut(authInfo, testPutBucketRequest, log, () => {
objectPut(authInfo, testPutObjectRequestFailure, undefined, log, err => {
assert(err.is.InternalError);
assert.strictEqual(ds.filter(obj => obj.keyContext.objectKey === objectNameFailure).length, 0);
done();
});
});
});
it('should not leave orphans in data when overwriting an multipart upload object', done => {
bucketPut(authInfo, testPutBucketRequest, log, () => {
mpuUtils.createMPU(namespace, bucketName, objectName, log,