Compare commits

...

5 Commits

Author SHA1 Message Date
Rahul Padigela ac06057be8 bugfix - test for datastore errors 2019-06-03 12:25:00 -07:00
bert-e 46ed7e1191 Merge branch 'w/7.4/feature/S3C-1139-sproxyd-batchdelete' into tmp/octopus/w/7.5/feature/S3C-1139-sproxyd-batchdelete 2019-06-03 17:13:47 +00:00
bert-e c22ceed180 Merge branch 'feature/S3C-1139-sproxyd-batchdelete' into tmp/octopus/w/7.4/feature/S3C-1139-sproxyd-batchdelete 2019-06-03 17:13:47 +00:00
Rahul Padigela 5ebf5cebdd bugfix: S3C-1139 use batch delete in api methods
This commits makes the batch delete invocation to use callbacks
so that orphans are not created when sproxyd is unstable.
2019-06-03 10:12:30 -07:00
Rahul Padigela 39bb67d16e improvement: implement batch delete for sproxydclient
This commit implements the sproxyd's batch delete method in the
data abstractions so that APIs can leverage it for deleting large
number of keys. It also changes the batchDelete function's signature
by adding a callback. It is expected for the api methods to use callback
when invoking the method.
2019-06-03 10:12:23 -07:00
11 changed files with 169 additions and 63 deletions

View File

@ -22,8 +22,8 @@ if (config.backends.data === 'file' ||
port: config.dataDaemon.port, port: config.dataDaemon.port,
dataStore: new arsenal.storage.data.file.DataFileStore( dataStore: new arsenal.storage.data.file.DataFileStore(
{ dataPath: config.dataDaemon.dataPath, { dataPath: config.dataDaemon.dataPath,
log: config.log }), log: { logLevel: 'trace', dumpLevel: 'error' } }),
log: config.log }); log: { logLevel: 'trace', dumpLevel: 'error' } });
dataServer.setup(err => { dataServer.setup(err => {
if (err) { if (err) {
logger.error('Error initializing REST data server', logger.error('Error initializing REST data server',

View File

@ -56,8 +56,8 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
if (dataToDelete) { if (dataToDelete) {
const newDataStoreName = Array.isArray(dataGetInfo) ? const newDataStoreName = Array.isArray(dataGetInfo) ?
dataGetInfo[0].dataStoreName : null; dataGetInfo[0].dataStoreName : null;
data.batchDelete(dataToDelete, requestMethod, return data.batchDelete(dataToDelete, requestMethod,
newDataStoreName, deleteLog); newDataStoreName, deleteLog, err => callback(err, result));
} }
return callback(null, result); return callback(null, result);
}); });

View File

@ -27,8 +27,14 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
}); });
const dataToDelete = []; const dataToDelete = [];
dataToDelete.push(dataRetrievalInfo); dataToDelete.push(dataRetrievalInfo);
data.batchDelete(dataToDelete, null, null, log); return data.batchDelete(dataToDelete, null, null, log, err => {
if (err) {
// failure of batch delete is only logged, client gets the
// error code about the md mismatch
log.error('error deleting old data', { error: err });
}
return cb(errors.BadDigest); return cb(errors.BadDigest);
});
} }
return cb(null, dataRetrievalInfo, completedHash); return cb(null, dataRetrievalInfo, completedHash);
} }

View File

@ -363,10 +363,19 @@ function completeMultipartUpload(authInfo, request, log, callback) {
Array.isArray(dataLocations) && dataLocations[0] ? Array.isArray(dataLocations) && dataLocations[0] ?
dataLocations[0].dataStoreName : null; dataLocations[0].dataStoreName : null;
if (sanityCheckPassed) { if (sanityCheckPassed) {
data.batchDelete(dataToDelete, request.method, const delLog =
newDataStoreName,
logger.newRequestLoggerFromSerializedUids(log logger.newRequestLoggerFromSerializedUids(log
.getSerializedUids())); .getSerializedUids());
return data.batchDelete(dataToDelete,
request.method,
newDataStoreName, delLog, err => {
if (err) {
return next(err);
}
return next(null, mpuBucket, keysToDelete,
aggregateETag, extraPartLocations,
destinationBucket, generatedVersionId);
});
} }
} }
return next(null, mpuBucket, keysToDelete, aggregateETag, return next(null, mpuBucket, keysToDelete, aggregateETag,
@ -377,13 +386,25 @@ function completeMultipartUpload(authInfo, request, log, callback) {
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag, function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
extraPartLocations, destinationBucket, generatedVersionId, next) { extraPartLocations, destinationBucket, generatedVersionId, next) {
services.batchDeleteObjectMetadata(mpuBucket.getName(), services.batchDeleteObjectMetadata(mpuBucket.getName(),
keysToDelete, log, err => next(err, destinationBucket, keysToDelete, log, err => next(err, extraPartLocations,
aggregateETag, generatedVersionId)); destinationBucket, aggregateETag, generatedVersionId));
},
function batchDeleteExtraParts(extraPartLocations, destinationBucket,
aggregateETag, generatedVersionId, next) {
if (extraPartLocations && extraPartLocations.length > 0) { if (extraPartLocations && extraPartLocations.length > 0) {
data.batchDelete(extraPartLocations, request.method, null, const delLog = logger.newRequestLoggerFromSerializedUids(
logger.newRequestLoggerFromSerializedUids(log log.getSerializedUids());
.getSerializedUids())); return data.batchDelete(extraPartLocations, request.method,
null, delLog, err => {
if (err) {
return next(err);
} }
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
});
}
return next(null, destinationBucket, aggregateETag,
generatedVersionId);
}, },
], (err, destinationBucket, aggregateETag, generatedVersionId) => { ], (err, destinationBucket, aggregateETag, generatedVersionId) => {
const resHeaders = const resHeaders =

View File

@ -423,25 +423,43 @@ function objectCopy(authInfo, request, sourceBucket,
log.debug('error storing new metadata', { error: err }); log.debug('error storing new metadata', { error: err });
return next(err, null, destBucketMD); return next(err, null, destBucketMD);
} }
const sourceObjSize = storeMetadataParams.size;
const destObjPrevSize = (destObjMD &&
destObjMD['content-length'] !== undefined) ?
destObjMD['content-length'] : null;
return next(null, dataToDelete, result, destBucketMD,
storeMetadataParams, serverSideEncryption,
sourceObjSize, destObjPrevSize);
});
},
function deleteExistingData(dataToDelete, storingNewMdResult,
destBucketMD, storeMetadataParams, serverSideEncryption,
sourceObjSize, destObjPrevSize, next) {
// Clean up any potential orphans in data if object // Clean up any potential orphans in data if object
// put is an overwrite of already existing // put is an overwrite of already existing
// object with same name, so long as the source is not // object with same name, so long as the source is not
// the same as the destination // the same as the destination
if (!sourceIsDestination && dataToDelete) { if (!sourceIsDestination && dataToDelete) {
const newDataStoreName = const newDataStoreName = storeMetadataParams.dataStoreName;
storeMetadataParams.dataStoreName; const delLog = logger.newRequestLoggerFromSerializedUids(
data.batchDelete(dataToDelete, request.method, log.getSerializedUids());
newDataStoreName, return data.batchDelete(dataToDelete, request.method,
logger.newRequestLoggerFromSerializedUids( newDataStoreName, delLog, err => {
log.getSerializedUids())); if (err) {
// if error, log the error and move on as it is not
// relevant to the client as the client's
// object already succeeded putting data, metadata
log.error('error deleting existing data',
{ error: err });
} }
const sourceObjSize = storeMetadataParams.size; next(null,
const destObjPrevSize = (destObjMD && storingNewMdResult, destBucketMD, storeMetadataParams,
destObjMD['content-length'] !== undefined) ?
destObjMD['content-length'] : null;
return next(null, result, destBucketMD, storeMetadataParams,
serverSideEncryption, sourceObjSize, destObjPrevSize); serverSideEncryption, sourceObjSize, destObjPrevSize);
}); });
}
return next(null,
storingNewMdResult, destBucketMD, storeMetadataParams,
serverSideEncryption, sourceObjSize, destObjPrevSize);
}, },
], (err, storingNewMdResult, destBucketMD, storeMetadataParams, ], (err, storingNewMdResult, destBucketMD, storeMetadataParams,
serverSideEncryption, sourceObjSize, destObjPrevSize) => { serverSideEncryption, sourceObjSize, destObjPrevSize) => {

View File

@ -297,17 +297,36 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
{ error: err, method: 'storeNewPartMetadata' }); { error: err, method: 'storeNewPartMetadata' });
return next(err); return next(err);
} }
return next(null, oldLocations, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize);
});
},
function cleanupExistingData(oldLocations, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize, next) {
// Clean up the old data now that new metadata (with new // Clean up the old data now that new metadata (with new
// data locations) has been stored // data locations) has been stored
if (oldLocations) { if (oldLocations) {
data.batchDelete(oldLocations, request.method, null, const delLog = logger.newRequestLoggerFromSerializedUids(
logger.newRequestLoggerFromSerializedUids( log.getSerializedUids());
log.getSerializedUids())); return data.batchDelete(oldLocations, request.method, null,
delLog, err => {
if (err) {
// if error, log the error and move on as it is not
// relevant to the client as the client's
// object already succeeded putting data, metadata
log.error('error deleting existing data',
{ error: err });
} }
return next(null, destBucketMD, totalHash, lastModified, return next(null, destBucketMD, totalHash,
sourceVerId, serverSideEncryption, prevObjectSize, lastModified, sourceVerId, serverSideEncryption,
copyObjectSize); prevObjectSize, copyObjectSize);
}); });
}
return next(null, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize);
}, },
], (err, destBucketMD, totalHash, lastModified, sourceVerId, ], (err, destBucketMD, totalHash, lastModified, sourceVerId,
serverSideEncryption, prevObjectSize, copyObjectSize) => { serverSideEncryption, prevObjectSize, copyObjectSize) => {

View File

@ -324,18 +324,33 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
}); });
return next(err, destinationBucket); return next(err, destinationBucket);
} }
return next(null, oldLocations, objectLocationConstraint,
destinationBucket, hexDigest, prevObjectSize);
});
},
// Clean up any old data now that new metadata (with new // Clean up any old data now that new metadata (with new
// data locations) has been stored. // data locations) has been stored.
(oldLocations, objectLocationConstraint, destinationBucket, hexDigest,
prevObjectSize, next) => {
if (oldLocations) { if (oldLocations) {
log.trace('Overwriting MPU part, deleting data'); log.trace('overwriting mpu part, deleting data');
data.batchDelete(oldLocations, request.method, const delLog = logger.newRequestLoggerFromSerializedUids(
objectLocationConstraint, log.getSerializedUids());
logger.newRequestLoggerFromSerializedUids(log return data.batchDelete(oldLocations, request.method,
.getSerializedUids())); objectLocationConstraint, delLog, err => {
if (err) {
// if error, log the error and move on as it is not
// relevant to the client as the client's
// object already succeeded putting data, metadata
log.error('error deleting existing data',
{ error: err });
} }
return next(null, destinationBucket, return next(null, destinationBucket, hexDigest,
hexDigest, prevObjectSize); prevObjectSize);
}); });
}
return next(null, destinationBucket, hexDigest,
prevObjectSize);
}, },
], (err, destinationBucket, hexDigest, prevObjectSize) => { ], (err, destinationBucket, hexDigest, prevObjectSize) => {
const corsHeaders = collectCorsHeaders(request.headers.origin, const corsHeaders = collectCorsHeaders(request.headers.origin,

View File

@ -117,6 +117,14 @@ const multipleBackendGateway = {
return client.delete(objectGetInfo, reqUids, callback); return client.delete(objectGetInfo, reqUids, callback);
}, },
batchDelete: (dataStoreName, keys, log, callback) => {
const client = clients[dataStoreName];
if (client.batchDelete) {
return client.batchDelete(keys, log.getSerializedUids(), callback);
}
return callback(errors.NotImplemented);
},
healthcheck: (flightCheckOnStartUp, log, callback) => { healthcheck: (flightCheckOnStartUp, log, callback) => {
const multBackendResp = {}; const multBackendResp = {};
const awsArray = []; const awsArray = [];

View File

@ -256,31 +256,45 @@ const data = {
return callback(err); return callback(err);
}); });
}, },
// It would be preferable to have an sproxyd batch delete route to
// replace this batchDelete: (locations, requestMethod, newObjDataStoreName, log, cb) => {
batchDelete: (locations, requestMethod, newObjDataStoreName, log) => {
// TODO: The method of persistence of sproxy delete key will // TODO: The method of persistence of sproxy delete key will
// be finalized; refer Issue #312 for the discussion. In the // be finalized; refer Issue #312 for the discussion. In the
// meantime, we at least log the location of the data we are // meantime, we at least log the location of the data we are
// about to delete before attempting its deletion. // about to delete before attempting its deletion.
if (_shouldSkipDelete(locations, requestMethod, newObjDataStoreName)) { if (_shouldSkipDelete(locations, requestMethod, newObjDataStoreName)) {
return; return process.nextTick(cb);
} }
log.trace('initiating batch delete', { log.trace('initiating batch delete', {
keys: locations, keys: locations,
implName, implName,
method: 'batchDelete', method: 'batchDelete',
}); });
async.eachLimit(locations, 5, (loc, next) => { const keys = [];
const shouldBatchDelete = locations.every(l => {
if (typeof l === 'string') {
keys.push(l);
return true;
}
if (l.dataStoreName === 'sproxyd') {
keys.push(l.key);
return true;
}
return false;
});
if (shouldBatchDelete) {
return client.batchDelete('sproxyd', keys, log, cb);
}
return async.eachLimit(locations, 5, (loc, next) => {
process.nextTick(() => data.delete(loc, log, next)); process.nextTick(() => data.delete(loc, log, next));
}, },
err => { err => {
if (err) { if (err) {
log.error('batch delete failed', { error: err }); log.end().error('batch delete failed', { error: err });
} else { return cb(err);
log.trace('batch delete successfully completed');
} }
log.end(); log.end().trace('batch delete successfully completed');
return cb();
}); });
}, },

View File

@ -250,16 +250,21 @@ const services = {
if (err) { if (err) {
return cb(err, res); return cb(err, res);
} }
cb(null, res); // this is smart
log.trace('deleteObject: metadata delete OK'); log.trace('deleteObject: metadata delete OK');
const deleteLog = logger.newRequestLogger(); const deleteLog = logger.newRequestLogger();
if (objectMD.location === null) { if (objectMD.location === null) {
return undefined; return cb(null, res);
} else if (!Array.isArray(objectMD.location)) { } else if (!Array.isArray(objectMD.location)) {
return data.delete(objectMD.location, deleteLog); data.delete(objectMD.location, deleteLog);
return cb(null, res);
} }
return data.batchDelete(objectMD.location, null, null, return data.batchDelete(objectMD.location, null, null,
deleteLog); deleteLog, err => {
if (err) {
return cb(err);
}
return cb(null, res);
});
}); });
} }

View File

@ -31,7 +31,7 @@
"mongodb": "^2.2.31", "mongodb": "^2.2.31",
"node-uuid": "^1.4.3", "node-uuid": "^1.4.3",
"npm-run-all": "~4.1.5", "npm-run-all": "~4.1.5",
"sproxydclient": "scality/sproxydclient#6a391f8d", "sproxydclient": "scality/sproxydclient#a6ec980",
"utapi": "scality/utapi#178666f", "utapi": "scality/utapi#178666f",
"utf8": "~2.1.1", "utf8": "~2.1.1",
"uuid": "^3.0.1", "uuid": "^3.0.1",