Compare commits

...

4 Commits

Author SHA1 Message Date
Dora Korpar 45c7fd09d9 imprv: S3C-3779 add mp-parts-count header
(cherry picked from commit 0f4154636a)
2021-01-14 14:16:27 -08:00
Jonathan Gramain 71bdf4f244 ft: ZENKO-143 batchDelete backbeat route
- Implement 'POST /_/backbeat/batchdelete' backbeat route to get rid
  of an array of data locations. The route will be used by the garbage
  collector service.

  * This includes some reorganization of backbeat routes sanity checks

- Handle gracefully 404 errors from datastore backend:

  * no need to retry 404 errors as they are permanent

  * batch delete can also gracefully handle missing data locations and
    still delete other existing locations (may happen if a retry
    occurs, when replaying a kafka message among other cases).

- Support service-gc service account with account owner access rights.

(cherry picked from commit 8874f97045)

Ticket associated with the cherry-pick: S3C-2665

(cherry picked from commit 5bc06d7c37)
2021-01-11 19:04:38 -08:00
Dora Korpar 3f4a23991e bf: S3C-3313 add canonicalid for abortmpu metric
(cherry picked from commit d449543eff)
2020-12-29 13:23:22 -08:00
Dora Korpar 84d18e497d bf: S3C-3313 storageUtilized for deleteBucket abortMPUs
(cherry picked from commit ead94d9193)
2020-12-29 13:23:14 -08:00
11 changed files with 342 additions and 43 deletions

View File

@ -3,6 +3,7 @@ const async = require('async');
const { errors } = require('arsenal');
const abortMultipartUpload = require('../object/abortMultipartUpload');
const { pushMetric } = require('../../../utapi/utilities');
const { splitter, oldSplitter, mpuBucketPrefix } =
require('../../../../constants');
@ -22,14 +23,23 @@ function _deleteMPUbucket(destinationBucketName, log, cb) {
});
}
function _deleteOngoingMPUs(authInfo, bucketName, mpus, log, cb) {
function _deleteOngoingMPUs(authInfo, bucketName, bucketMD, mpus, log, cb) {
async.mapLimit(mpus, 1, (mpu, next) => {
const splitterChar = mpu.key.includes(oldSplitter) ?
oldSplitter : splitter;
// `overview${splitter}${objectKey}${splitter}${uploadId}
const [, objectKey, uploadId] = mpu.key.split(splitterChar);
abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
next);
(err, destBucket, partSizeSum) => {
pushMetric('abortMultipartUpload', log, {
authInfo,
canonicalID: bucketMD.getOwner(),
bucket: bucketName,
keys: [objectKey],
byteLength: partSizeSum,
});
next(err);
});
}, cb);
}
/**
@ -89,7 +99,7 @@ function deleteBucket(authInfo, bucketMD, bucketName, canonicalID, log, cb) {
}
if (objectsListRes.Contents.length) {
return _deleteOngoingMPUs(authInfo, bucketName,
objectsListRes.Contents, log, err => {
bucketMD, objectsListRes.Contents, log, err => {
if (err) {
return next(err);
}

View File

@ -54,7 +54,21 @@ function getPartSize(objMD, partNumber) {
return size;
}
/**
* Gets parts count if object was put with mpu
* @param {object} objMD - object metadata
* @return {(Integer|null)} - number of parts of mpu object or null
*/
function getPartCountFromMd5(objMD) {
const md5 = objMD['content-md5'];
if (md5.includes('-')) {
return md5.split('-')[1];
}
return null;
}
module.exports = {
getPartNumber,
getPartSize,
getPartCountFromMd5,
};

View File

@ -10,6 +10,7 @@ const { pushMetric } = require('../utapi/utilities');
const { getVersionIdResHeader } = require('./apiUtils/object/versioning');
const setPartRanges = require('./apiUtils/object/setPartRanges');
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const { getPartCountFromMd5 } = require('./apiUtils/object/partInfo');
const validateHeaders = s3middleware.validateConditionalHeaders;
@ -189,6 +190,11 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
responseMetaHeaders['Content-Length'] = end - start + 1;
const partByteRange = [start, end];
dataLocator = setPartRanges(dataLocator, partByteRange);
const partsCount = getPartCountFromMd5(objMD);
if (partsCount) {
responseMetaHeaders['x-amz-mp-parts-count'] =
partsCount;
}
} else {
dataLocator = setPartRanges(dataLocator, byteRange);
}

View File

@ -7,7 +7,9 @@ const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const collectResponseHeaders = require('../utilities/collectResponseHeaders');
const { pushMetric } = require('../utapi/utilities');
const { getVersionIdResHeader } = require('./apiUtils/object/versioning');
const { getPartNumber, getPartSize } = require('./apiUtils/object/partInfo');
const { getPartNumber, getPartSize, getPartCountFromMd5 } =
require('./apiUtils/object/partInfo');
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const { maximumAllowedPartCount } = require('../../constants');
@ -120,6 +122,10 @@ function objectHead(authInfo, request, log, callback) {
return callback(errors.InvalidRange, corsHeaders);
}
responseHeaders['content-length'] = partSize;
const partsCount = getPartCountFromMd5(objMD);
if (partsCount) {
responseHeaders['x-amz-mp-parts-count'] = partsCount;
}
}
pushMetric('headObject', log, {
authInfo,

View File

@ -85,6 +85,11 @@ function _retryDelete(objectGetInfo, log, count, cb) {
}
return client.delete(objectGetInfo, log.getSerializedUids(), err => {
if (err) {
if (err.ObjNotFound) {
log.info('no such key in datastore',
{ objectGetInfo, implName, moreRetries: 'no' });
return cb(err);
}
log.error('delete error from datastore',
{ error: err, implName, moreRetries: 'yes' });
return _retryDelete(objectGetInfo, log, count + 1, cb);
@ -253,7 +258,7 @@ const data = {
return;
}
_retryDelete(clientGetInfo, log, 0, err => {
if (err) {
if (err && !err.ObjNotFound) {
log.error('delete error from datastore',
{ error: err, key: objectGetInfo.key, moreRetries: 'no' });
}

View File

@ -5,6 +5,7 @@ const { auth, errors, s3middleware } = require('arsenal');
const { responseJSONBody } = require('arsenal').s3routes.routesUtils;
const { getSubPartIds } = s3middleware.azureHelper.mpuUtils;
const vault = require('../auth/vault');
const data = require('../data/wrapper');
const metadata = require('../metadata/wrapper');
const locationConstraintCheck = require(
'../api/apiUtils/object/locationConstraintCheck');
@ -30,7 +31,7 @@ function _decodeURI(uri) {
return decodeURIComponent(uri.replace(/\+/g, ' '));
}
function normalizeBackbeatRequest(req) {
function _normalizeBackbeatRequest(req) {
/* eslint-disable no-param-reassign */
const parsedUrl = url.parse(req.url, true);
req.path = _decodeURI(parsedUrl.pathname);
@ -42,6 +43,11 @@ function normalizeBackbeatRequest(req) {
/* eslint-enable no-param-reassign */
}
function _isObjectRequest(req) {
return ['data', 'metadata', 'multiplebackenddata']
.includes(req.resourceType);
}
function _respond(response, payload, log, callback) {
const body = typeof payload === 'object' ?
JSON.stringify(payload) : payload;
@ -217,10 +223,17 @@ PUT /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=putobject
PUT /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=putpart
DELETE /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=deleteobject
DELETE /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=deleteobjecttagging
POST /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=initiatempu
POST /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=completempu
POST /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=puttagging
POST /_/backbeat/batchdelete
*/
function putData(request, response, bucketInfo, objMd, log, callback) {
@ -615,6 +628,49 @@ function deleteObjectTagging(request, response, log, callback) {
dataStoreVersionId, log, callback);
}
function batchDelete(request, response, log, callback) {
return _getRequestPayload(request, (err, payload) => {
if (err) {
return callback(err);
}
let request;
try {
request = JSON.parse(payload);
} catch (e) {
// FIXME: add error type MalformedJSON
return callback(errors.MalformedPOSTRequest);
}
if (!request || !Array.isArray(request.Locations)) {
return callback(errors.MalformedPOSTRequest);
}
const locations = request.Locations;
log.trace('batch delete locations', { locations });
return async.eachLimit(locations, 5, (loc, next) => {
data.delete(loc, log, err => {
if (err && err.ObjNotFound) {
log.info('batch delete: data location do not exist', {
method: 'batchDelete',
location: loc,
});
return next();
}
return next(err);
});
}, err => {
if (err) {
log.error('batch delete failed', {
method: 'batchDelete',
locations,
error: err,
});
return callback(err);
}
log.debug('batch delete successful', { locations });
return _respond(response, null, log, callback);
});
});
}
const backbeatRoutes = {
PUT: {
data: putData,
@ -630,6 +686,7 @@ const backbeatRoutes = {
completempu: completeMultipartUpload,
puttagging: putObjectTagging,
},
batchdelete: batchDelete,
},
DELETE: {
multiplebackenddata: {
@ -643,27 +700,47 @@ const backbeatRoutes = {
};
function routeBackbeat(clientIP, request, response, log) {
log.debug('routing request', { method: 'routeBackbeat' });
normalizeBackbeatRequest(request);
log.debug('routing request', {
method: 'routeBackbeat',
url: request.url,
});
_normalizeBackbeatRequest(request);
const useMultipleBackend = request.resourceType === 'multiplebackenddata';
const invalidRequest = (!request.bucketName ||
!request.objectKey ||
!request.resourceType ||
const invalidRequest =
(!request.resourceType ||
(_isObjectRequest(request) &&
(!request.bucketName || !request.objectKey)) ||
(!request.query.operation && useMultipleBackend));
const invalidRoute =
(backbeatRoutes[request.method] === undefined ||
backbeatRoutes[request.method][request.resourceType] === undefined ||
(backbeatRoutes[request.method][request.resourceType]
[request.query.operation] === undefined &&
useMultipleBackend));
log.addDefaultFields({
bucketName: request.bucketName,
objectKey: request.objectKey,
bytesReceived: request.parsedContentLength || 0,
bodyLength: parseInt(request.headers['content-length'], 10) || 0,
});
if (invalidRequest) {
log.debug('invalid request', {
method: request.method,
resourceType: request.resourceType,
if (invalidRequest || invalidRoute) {
log.debug(invalidRequest ? 'invalid request' : 'no such route', {
method: request.method, bucketName: request.bucketName,
objectKey: request.objectKey, resourceType: request.resourceType,
query: request.query,
});
return responseJSONBody(errors.MethodNotAllowed, null, response, log);
}
if (!_isObjectRequest(request)) {
const route = backbeatRoutes[request.method][request.resourceType];
return route(request, response, log, err => {
if (err) {
return responseJSONBody(err, null, response, log);
}
return undefined;
});
}
const requestContexts = prepareRequestContexts('objectReplicate', request);
const decodedVidResult = decodeVersionId(request.query);
if (decodedVidResult instanceof Error) {
@ -699,21 +776,6 @@ function routeBackbeat(clientIP, request, response, log) {
return metadataValidateBucketAndObj(mdValParams, log, next);
},
(bucketInfo, objMd, next) => {
const invalidRoute = backbeatRoutes[request.method] === undefined ||
backbeatRoutes[request.method][request.resourceType] ===
undefined ||
(backbeatRoutes[request.method][request.resourceType]
[request.query.operation] === undefined &&
useMultipleBackend);
if (invalidRoute) {
log.debug('no such route', { method: request.method,
bucketName: request.bucketName,
objectKey: request.objectKey,
resourceType: request.resourceType,
query: request.query,
});
return next(errors.MethodNotAllowed);
}
if (useMultipleBackend) {
return backbeatRoutes[request.method][request.resourceType]
[request.query.operation](request, response, log, next);

View File

@ -66,6 +66,7 @@
"ft_awssdk_external_backends": "cd tests/functional/aws-node-sdk && mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json test/multipleBackend",
"ft_management": "cd tests/functional/report && yarn test",
"ft_node": "cd tests/functional/raw-node && yarn test",
"ft_node_routes": "cd tests/functional/raw-node && yarn run test-routes",
"ft_gcp": "cd tests/functional/raw-node && yarn run test-gcp",
"ft_healthchecks": "cd tests/functional/healthchecks && yarn test",
"ft_s3cmd": "cd tests/functional/s3cmd && mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json -t 40000 *.js",

View File

@ -36,8 +36,8 @@ function checkError(err, code) {
assert.strictEqual(err.code, code);
}
function checkContentLength(contentLengthHeader, expectedSize) {
assert.strictEqual(Number.parseInt(contentLengthHeader, 10), expectedSize);
function checkIntegerHeader(integerHeader, expectedSize) {
assert.strictEqual(Number.parseInt(integerHeader, 10), expectedSize);
}
function dateFromNow(diff) {
@ -69,7 +69,7 @@ describe('GET object', () => {
PartNumber: partNumber,
}, (err, data) => {
checkNoError(err);
checkContentLength(data.ContentLength, len);
checkIntegerHeader(data.ContentLength, len);
const md5Hash = crypto.createHash('md5');
const md5HashExpected = crypto.createHash('md5');
assert.strictEqual(
@ -759,7 +759,7 @@ describe('GET object', () => {
checkNoError(err);
return requestGet({ PartNumber: num }, (err, data) => {
checkNoError(err);
checkContentLength(data.ContentLength, partSize);
checkIntegerHeader(data.ContentLength, partSize);
const md5Hash = crypto.createHash('md5');
const md5HashExpected = crypto.createHash('md5');
const expected = Buffer.alloc(partSize).fill(num);
@ -778,7 +778,7 @@ describe('GET object', () => {
checkNoError(err);
return requestGet({ PartNumber: num }, (err, data) => {
checkNoError(err);
checkContentLength(data.ContentLength, partSize);
checkIntegerHeader(data.ContentLength, partSize);
const md5Hash = crypto.createHash('md5');
const md5HashExpected = crypto.createHash('md5');
const expected = Buffer.alloc(partSize)
@ -838,7 +838,7 @@ describe('GET object', () => {
}, err => {
checkNoError(err);
return requestGet({ PartNumber: '1' }, (err, data) => {
checkContentLength(data.ContentLength, 10);
checkIntegerHeader(data.ContentLength, 10);
const md5Hash = crypto.createHash('md5');
const md5HashExpected = crypto.createHash('md5');
const expected = Buffer.alloc(10);
@ -876,6 +876,35 @@ describe('GET object', () => {
});
}));
it('should not include PartsCount response header for regular ' +
'put object', done => {
s3.putObject({
Bucket: bucketName,
Key: objectName,
Body: Buffer.alloc(10),
}, err => {
assert.ifError(err);
requestGet({ PartNumber: 1 }, (err, data) => {
assert.ifError(err);
assert.strictEqual('PartsCount' in data, false,
'PartsCount header is present.');
done();
});
});
});
it('should include PartsCount response header for mpu object',
done => {
completeMPU(orderedPartNumbers, err => {
assert.ifError(err);
return requestGet({ PartNumber: 1 }, (err, data) => {
assert.ifError(err);
checkIntegerHeader(data.PartsCount, 10);
done();
});
});
});
describe('uploadPartCopy', () => {
// The original object was composed of three parts
const partOneSize = partSize * 10;

View File

@ -1,4 +1,5 @@
const assert = require('assert');
const async = require('async');
const { errors } = require('arsenal');
const moment = require('moment');
const Promise = require('bluebird');
@ -11,6 +12,7 @@ const changeLockPromise = Promise.promisify(changeObjectLock);
const bucketName = 'alexbucketnottaken';
const objectName = 'someObject';
const partSize = 1024 * 1024 * 5; // 5MB minumum required part size.
function checkNoError(err) {
assert.equal(err, null,
@ -466,6 +468,70 @@ describe('HEAD object, conditions', () => {
done();
});
});
it('PartNumber is set & PartsCount is absent because object is not ' +
'multipart', done => {
requestHead({ PartNumber: 1 }, (err, data) => {
assert.ifError(err);
assert.strictEqual('PartsCount' in data, false,
'PartsCount header is present.');
done();
});
});
it('PartNumber is set & PartsCount appears in response for ' +
'multipart object', done => {
const mpuKey = 'mpukey';
async.waterfall([
next => s3.createMultipartUpload({
Bucket: bucketName,
Key: mpuKey,
}, next),
(data, next) => {
const uploadId = data.UploadId;
s3.uploadPart({
Bucket: bucketName,
Key: mpuKey,
UploadId: uploadId,
PartNumber: 1,
Body: Buffer.alloc(partSize).fill('a'),
}, (err, data) => next(err, uploadId, data.ETag));
},
(uploadId, etagOne, next) => s3.uploadPart({
Bucket: bucketName,
Key: mpuKey,
UploadId: uploadId,
PartNumber: 2,
Body: Buffer.alloc(partSize).fill('z'),
}, (err, data) => next(err, uploadId, etagOne, data.ETag)),
(uploadId, etagOne, etagTwo, next) =>
s3.completeMultipartUpload({
Bucket: bucketName,
Key: mpuKey,
UploadId: uploadId,
MultipartUpload: {
Parts: [{
PartNumber: 1,
ETag: etagOne,
}, {
PartNumber: 2,
ETag: etagTwo,
}],
},
}, next),
], err => {
assert.ifError(err);
s3.headObject({
Bucket: bucketName,
Key: mpuKey,
PartNumber: 1,
}, (err, data) => {
assert.ifError(err);
assert.strictEqual(data.PartsCount, 2);
done();
});
});
});
});
});

View File

@ -10,6 +10,7 @@
"scripts": {
"test-aws": "AWS_ON_AIR=true mocha -t 40000 test/",
"test-gcp": "mocha -t 40000 test/GCP/",
"test-routes": "mocha -t 40000 test/routes/",
"test": "mocha -t 40000 test/",
"test-debug": "_mocha -t 40000 test/"
},

View File

@ -103,7 +103,7 @@ function makeBackbeatRequest(params, callback) {
makeRequest(options, callback);
}
describeSkipIfAWS('backbeat routes:', () => {
describeSkipIfAWS('backbeat routes', () => {
let bucketUtil;
let s3;
@ -131,7 +131,7 @@ describeSkipIfAWS('backbeat routes:', () => {
.then(() => done());
});
describe('backbeat PUT routes:', () => {
describe('backbeat PUT routes', () => {
describe('PUT data + metadata should create a new complete object',
() => {
[{
@ -339,7 +339,7 @@ describeSkipIfAWS('backbeat routes:', () => {
});
});
});
describe('backbeat authorization checks:', () => {
describe('backbeat authorization checks', () => {
[{ method: 'PUT', resourceType: 'metadata' },
{ method: 'PUT', resourceType: 'data' }].forEach(test => {
it(`${test.method} ${test.resourceType} should respond with ` +
@ -414,7 +414,7 @@ describeSkipIfAWS('backbeat routes:', () => {
});
});
describe('GET Metadata route:', () => {
describe('GET Metadata route', () => {
beforeEach(done => makeBackbeatRequest({
method: 'PUT', bucket: TEST_BUCKET,
objectKey: TEST_KEY,
@ -439,7 +439,7 @@ describeSkipIfAWS('backbeat routes:', () => {
});
});
it('should return error if bucket does not exist ', done => {
it('should return error if bucket does not exist', done => {
makeBackbeatRequest({
method: 'GET', bucket: 'blah',
objectKey: TEST_KEY, resourceType: 'metadata',
@ -454,7 +454,7 @@ describeSkipIfAWS('backbeat routes:', () => {
});
});
it('should return error if object does not exist ', done => {
it('should return error if object does not exist', done => {
makeBackbeatRequest({
method: 'GET', bucket: TEST_BUCKET,
objectKey: 'blah', resourceType: 'metadata',
@ -469,4 +469,103 @@ describeSkipIfAWS('backbeat routes:', () => {
});
});
});
describe('Batch Delete Route', () => {
it('should batch delete a location', done => {
let versionId;
let location;
async.series([
done => s3.putObject({
Bucket: TEST_BUCKET,
Key: 'batch-delete-test-key',
Body: new Buffer('hello'),
}, done),
done => s3.getObject({
Bucket: TEST_BUCKET,
Key: 'batch-delete-test-key',
}, (err, data) => {
assert.ifError(err);
assert.strictEqual(data.Body.toString(), 'hello');
versionId = data.VersionId;
done();
}),
done => {
makeBackbeatRequest({
method: 'GET', bucket: TEST_BUCKET,
objectKey: 'batch-delete-test-key',
resourceType: 'metadata',
authCredentials: backbeatAuthCredentials,
queryObj: {
versionId,
},
}, (err, data) => {
assert.ifError(err);
assert.strictEqual(data.statusCode, 200);
const metadata = JSON.parse(
JSON.parse(data.body).Body);
location = metadata.location;
done();
});
},
done => {
const options = {
authCredentials: backbeatAuthCredentials,
hostname: ipAddress,
port: 8000,
method: 'POST',
path: '/_/backbeat/batchdelete',
requestBody:
`{"Locations":${JSON.stringify(location)}}`,
jsonResponse: true,
};
makeRequest(options, done);
},
done => s3.getObject({
Bucket: TEST_BUCKET,
Key: 'batch-delete-test-key',
}, err => {
// should error out as location shall no longer exist
assert(err);
done();
}),
], done);
});
it('should fail with error if given malformed JSON', done => {
async.series([
done => {
const options = {
authCredentials: backbeatAuthCredentials,
hostname: ipAddress,
port: 8000,
method: 'POST',
path: '/_/backbeat/batchdelete',
requestBody: 'NOTJSON',
jsonResponse: true,
};
makeRequest(options, done);
},
], err => {
assert(err);
done();
});
});
it('should skip batch delete of a non-existent location', done => {
async.series([
done => {
const options = {
authCredentials: backbeatAuthCredentials,
hostname: ipAddress,
port: 8000,
method: 'POST',
path: '/_/backbeat/batchdelete',
requestBody:
'{"Locations":' +
'[{"key":"abcdef","dataStoreName":"us-east-1"}]}',
jsonResponse: true,
};
makeRequest(options, done);
},
], done);
});
});
});