Compare commits
5 Commits
developmen
...
bugfix/tes
Author | SHA1 | Date |
---|---|---|
Rahul Padigela | ac06057be8 | |
bert-e | 46ed7e1191 | |
bert-e | c22ceed180 | |
Rahul Padigela | 5ebf5cebdd | |
Rahul Padigela | 39bb67d16e |
|
@ -22,8 +22,8 @@ if (config.backends.data === 'file' ||
|
||||||
port: config.dataDaemon.port,
|
port: config.dataDaemon.port,
|
||||||
dataStore: new arsenal.storage.data.file.DataFileStore(
|
dataStore: new arsenal.storage.data.file.DataFileStore(
|
||||||
{ dataPath: config.dataDaemon.dataPath,
|
{ dataPath: config.dataDaemon.dataPath,
|
||||||
log: config.log }),
|
log: { logLevel: 'trace', dumpLevel: 'error' } }),
|
||||||
log: config.log });
|
log: { logLevel: 'trace', dumpLevel: 'error' } });
|
||||||
dataServer.setup(err => {
|
dataServer.setup(err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.error('Error initializing REST data server',
|
logger.error('Error initializing REST data server',
|
||||||
|
|
|
@ -56,8 +56,8 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
||||||
if (dataToDelete) {
|
if (dataToDelete) {
|
||||||
const newDataStoreName = Array.isArray(dataGetInfo) ?
|
const newDataStoreName = Array.isArray(dataGetInfo) ?
|
||||||
dataGetInfo[0].dataStoreName : null;
|
dataGetInfo[0].dataStoreName : null;
|
||||||
data.batchDelete(dataToDelete, requestMethod,
|
return data.batchDelete(dataToDelete, requestMethod,
|
||||||
newDataStoreName, deleteLog);
|
newDataStoreName, deleteLog, err => callback(err, result));
|
||||||
}
|
}
|
||||||
return callback(null, result);
|
return callback(null, result);
|
||||||
});
|
});
|
||||||
|
|
|
@ -27,8 +27,14 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
||||||
});
|
});
|
||||||
const dataToDelete = [];
|
const dataToDelete = [];
|
||||||
dataToDelete.push(dataRetrievalInfo);
|
dataToDelete.push(dataRetrievalInfo);
|
||||||
data.batchDelete(dataToDelete, null, null, log);
|
return data.batchDelete(dataToDelete, null, null, log, err => {
|
||||||
return cb(errors.BadDigest);
|
if (err) {
|
||||||
|
// failure of batch delete is only logged, client gets the
|
||||||
|
// error code about the md mismatch
|
||||||
|
log.error('error deleting old data', { error: err });
|
||||||
|
}
|
||||||
|
return cb(errors.BadDigest);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
return cb(null, dataRetrievalInfo, completedHash);
|
return cb(null, dataRetrievalInfo, completedHash);
|
||||||
}
|
}
|
||||||
|
|
|
@ -363,10 +363,19 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
||||||
Array.isArray(dataLocations) && dataLocations[0] ?
|
Array.isArray(dataLocations) && dataLocations[0] ?
|
||||||
dataLocations[0].dataStoreName : null;
|
dataLocations[0].dataStoreName : null;
|
||||||
if (sanityCheckPassed) {
|
if (sanityCheckPassed) {
|
||||||
data.batchDelete(dataToDelete, request.method,
|
const delLog =
|
||||||
newDataStoreName,
|
|
||||||
logger.newRequestLoggerFromSerializedUids(log
|
logger.newRequestLoggerFromSerializedUids(log
|
||||||
.getSerializedUids()));
|
.getSerializedUids());
|
||||||
|
return data.batchDelete(dataToDelete,
|
||||||
|
request.method,
|
||||||
|
newDataStoreName, delLog, err => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, mpuBucket, keysToDelete,
|
||||||
|
aggregateETag, extraPartLocations,
|
||||||
|
destinationBucket, generatedVersionId);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return next(null, mpuBucket, keysToDelete, aggregateETag,
|
return next(null, mpuBucket, keysToDelete, aggregateETag,
|
||||||
|
@ -377,13 +386,25 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
||||||
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
|
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
|
||||||
extraPartLocations, destinationBucket, generatedVersionId, next) {
|
extraPartLocations, destinationBucket, generatedVersionId, next) {
|
||||||
services.batchDeleteObjectMetadata(mpuBucket.getName(),
|
services.batchDeleteObjectMetadata(mpuBucket.getName(),
|
||||||
keysToDelete, log, err => next(err, destinationBucket,
|
keysToDelete, log, err => next(err, extraPartLocations,
|
||||||
aggregateETag, generatedVersionId));
|
destinationBucket, aggregateETag, generatedVersionId));
|
||||||
|
},
|
||||||
|
function batchDeleteExtraParts(extraPartLocations, destinationBucket,
|
||||||
|
aggregateETag, generatedVersionId, next) {
|
||||||
if (extraPartLocations && extraPartLocations.length > 0) {
|
if (extraPartLocations && extraPartLocations.length > 0) {
|
||||||
data.batchDelete(extraPartLocations, request.method, null,
|
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||||
logger.newRequestLoggerFromSerializedUids(log
|
log.getSerializedUids());
|
||||||
.getSerializedUids()));
|
return data.batchDelete(extraPartLocations, request.method,
|
||||||
|
null, delLog, err => {
|
||||||
|
if (err) {
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, destinationBucket, aggregateETag,
|
||||||
|
generatedVersionId);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
return next(null, destinationBucket, aggregateETag,
|
||||||
|
generatedVersionId);
|
||||||
},
|
},
|
||||||
], (err, destinationBucket, aggregateETag, generatedVersionId) => {
|
], (err, destinationBucket, aggregateETag, generatedVersionId) => {
|
||||||
const resHeaders =
|
const resHeaders =
|
||||||
|
|
|
@ -423,26 +423,44 @@ function objectCopy(authInfo, request, sourceBucket,
|
||||||
log.debug('error storing new metadata', { error: err });
|
log.debug('error storing new metadata', { error: err });
|
||||||
return next(err, null, destBucketMD);
|
return next(err, null, destBucketMD);
|
||||||
}
|
}
|
||||||
// Clean up any potential orphans in data if object
|
|
||||||
// put is an overwrite of already existing
|
|
||||||
// object with same name, so long as the source is not
|
|
||||||
// the same as the destination
|
|
||||||
if (!sourceIsDestination && dataToDelete) {
|
|
||||||
const newDataStoreName =
|
|
||||||
storeMetadataParams.dataStoreName;
|
|
||||||
data.batchDelete(dataToDelete, request.method,
|
|
||||||
newDataStoreName,
|
|
||||||
logger.newRequestLoggerFromSerializedUids(
|
|
||||||
log.getSerializedUids()));
|
|
||||||
}
|
|
||||||
const sourceObjSize = storeMetadataParams.size;
|
const sourceObjSize = storeMetadataParams.size;
|
||||||
const destObjPrevSize = (destObjMD &&
|
const destObjPrevSize = (destObjMD &&
|
||||||
destObjMD['content-length'] !== undefined) ?
|
destObjMD['content-length'] !== undefined) ?
|
||||||
destObjMD['content-length'] : null;
|
destObjMD['content-length'] : null;
|
||||||
return next(null, result, destBucketMD, storeMetadataParams,
|
return next(null, dataToDelete, result, destBucketMD,
|
||||||
serverSideEncryption, sourceObjSize, destObjPrevSize);
|
storeMetadataParams, serverSideEncryption,
|
||||||
|
sourceObjSize, destObjPrevSize);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
function deleteExistingData(dataToDelete, storingNewMdResult,
|
||||||
|
destBucketMD, storeMetadataParams, serverSideEncryption,
|
||||||
|
sourceObjSize, destObjPrevSize, next) {
|
||||||
|
// Clean up any potential orphans in data if object
|
||||||
|
// put is an overwrite of already existing
|
||||||
|
// object with same name, so long as the source is not
|
||||||
|
// the same as the destination
|
||||||
|
if (!sourceIsDestination && dataToDelete) {
|
||||||
|
const newDataStoreName = storeMetadataParams.dataStoreName;
|
||||||
|
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||||
|
log.getSerializedUids());
|
||||||
|
return data.batchDelete(dataToDelete, request.method,
|
||||||
|
newDataStoreName, delLog, err => {
|
||||||
|
if (err) {
|
||||||
|
// if error, log the error and move on as it is not
|
||||||
|
// relevant to the client as the client's
|
||||||
|
// object already succeeded putting data, metadata
|
||||||
|
log.error('error deleting existing data',
|
||||||
|
{ error: err });
|
||||||
|
}
|
||||||
|
next(null,
|
||||||
|
storingNewMdResult, destBucketMD, storeMetadataParams,
|
||||||
|
serverSideEncryption, sourceObjSize, destObjPrevSize);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return next(null,
|
||||||
|
storingNewMdResult, destBucketMD, storeMetadataParams,
|
||||||
|
serverSideEncryption, sourceObjSize, destObjPrevSize);
|
||||||
|
},
|
||||||
], (err, storingNewMdResult, destBucketMD, storeMetadataParams,
|
], (err, storingNewMdResult, destBucketMD, storeMetadataParams,
|
||||||
serverSideEncryption, sourceObjSize, destObjPrevSize) => {
|
serverSideEncryption, sourceObjSize, destObjPrevSize) => {
|
||||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
|
|
|
@ -297,18 +297,37 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
||||||
{ error: err, method: 'storeNewPartMetadata' });
|
{ error: err, method: 'storeNewPartMetadata' });
|
||||||
return next(err);
|
return next(err);
|
||||||
}
|
}
|
||||||
// Clean up the old data now that new metadata (with new
|
return next(null, oldLocations, destBucketMD, totalHash,
|
||||||
// data locations) has been stored
|
lastModified, sourceVerId, serverSideEncryption,
|
||||||
if (oldLocations) {
|
prevObjectSize, copyObjectSize);
|
||||||
data.batchDelete(oldLocations, request.method, null,
|
|
||||||
logger.newRequestLoggerFromSerializedUids(
|
|
||||||
log.getSerializedUids()));
|
|
||||||
}
|
|
||||||
return next(null, destBucketMD, totalHash, lastModified,
|
|
||||||
sourceVerId, serverSideEncryption, prevObjectSize,
|
|
||||||
copyObjectSize);
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
function cleanupExistingData(oldLocations, destBucketMD, totalHash,
|
||||||
|
lastModified, sourceVerId, serverSideEncryption,
|
||||||
|
prevObjectSize, copyObjectSize, next) {
|
||||||
|
// Clean up the old data now that new metadata (with new
|
||||||
|
// data locations) has been stored
|
||||||
|
if (oldLocations) {
|
||||||
|
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||||
|
log.getSerializedUids());
|
||||||
|
return data.batchDelete(oldLocations, request.method, null,
|
||||||
|
delLog, err => {
|
||||||
|
if (err) {
|
||||||
|
// if error, log the error and move on as it is not
|
||||||
|
// relevant to the client as the client's
|
||||||
|
// object already succeeded putting data, metadata
|
||||||
|
log.error('error deleting existing data',
|
||||||
|
{ error: err });
|
||||||
|
}
|
||||||
|
return next(null, destBucketMD, totalHash,
|
||||||
|
lastModified, sourceVerId, serverSideEncryption,
|
||||||
|
prevObjectSize, copyObjectSize);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return next(null, destBucketMD, totalHash,
|
||||||
|
lastModified, sourceVerId, serverSideEncryption,
|
||||||
|
prevObjectSize, copyObjectSize);
|
||||||
|
},
|
||||||
], (err, destBucketMD, totalHash, lastModified, sourceVerId,
|
], (err, destBucketMD, totalHash, lastModified, sourceVerId,
|
||||||
serverSideEncryption, prevObjectSize, copyObjectSize) => {
|
serverSideEncryption, prevObjectSize, copyObjectSize) => {
|
||||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
|
|
|
@ -324,19 +324,34 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
||||||
});
|
});
|
||||||
return next(err, destinationBucket);
|
return next(err, destinationBucket);
|
||||||
}
|
}
|
||||||
// Clean up any old data now that new metadata (with new
|
return next(null, oldLocations, objectLocationConstraint,
|
||||||
// data locations) has been stored.
|
destinationBucket, hexDigest, prevObjectSize);
|
||||||
if (oldLocations) {
|
|
||||||
log.trace('Overwriting MPU part, deleting data');
|
|
||||||
data.batchDelete(oldLocations, request.method,
|
|
||||||
objectLocationConstraint,
|
|
||||||
logger.newRequestLoggerFromSerializedUids(log
|
|
||||||
.getSerializedUids()));
|
|
||||||
}
|
|
||||||
return next(null, destinationBucket,
|
|
||||||
hexDigest, prevObjectSize);
|
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
// Clean up any old data now that new metadata (with new
|
||||||
|
// data locations) has been stored.
|
||||||
|
(oldLocations, objectLocationConstraint, destinationBucket, hexDigest,
|
||||||
|
prevObjectSize, next) => {
|
||||||
|
if (oldLocations) {
|
||||||
|
log.trace('overwriting mpu part, deleting data');
|
||||||
|
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||||
|
log.getSerializedUids());
|
||||||
|
return data.batchDelete(oldLocations, request.method,
|
||||||
|
objectLocationConstraint, delLog, err => {
|
||||||
|
if (err) {
|
||||||
|
// if error, log the error and move on as it is not
|
||||||
|
// relevant to the client as the client's
|
||||||
|
// object already succeeded putting data, metadata
|
||||||
|
log.error('error deleting existing data',
|
||||||
|
{ error: err });
|
||||||
|
}
|
||||||
|
return next(null, destinationBucket, hexDigest,
|
||||||
|
prevObjectSize);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return next(null, destinationBucket, hexDigest,
|
||||||
|
prevObjectSize);
|
||||||
|
},
|
||||||
], (err, destinationBucket, hexDigest, prevObjectSize) => {
|
], (err, destinationBucket, hexDigest, prevObjectSize) => {
|
||||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
request.method, destinationBucket);
|
request.method, destinationBucket);
|
||||||
|
|
|
@ -117,6 +117,14 @@ const multipleBackendGateway = {
|
||||||
return client.delete(objectGetInfo, reqUids, callback);
|
return client.delete(objectGetInfo, reqUids, callback);
|
||||||
},
|
},
|
||||||
|
|
||||||
|
batchDelete: (dataStoreName, keys, log, callback) => {
|
||||||
|
const client = clients[dataStoreName];
|
||||||
|
if (client.batchDelete) {
|
||||||
|
return client.batchDelete(keys, log.getSerializedUids(), callback);
|
||||||
|
}
|
||||||
|
return callback(errors.NotImplemented);
|
||||||
|
},
|
||||||
|
|
||||||
healthcheck: (flightCheckOnStartUp, log, callback) => {
|
healthcheck: (flightCheckOnStartUp, log, callback) => {
|
||||||
const multBackendResp = {};
|
const multBackendResp = {};
|
||||||
const awsArray = [];
|
const awsArray = [];
|
||||||
|
|
|
@ -256,31 +256,45 @@ const data = {
|
||||||
return callback(err);
|
return callback(err);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
// It would be preferable to have an sproxyd batch delete route to
|
|
||||||
// replace this
|
batchDelete: (locations, requestMethod, newObjDataStoreName, log, cb) => {
|
||||||
batchDelete: (locations, requestMethod, newObjDataStoreName, log) => {
|
|
||||||
// TODO: The method of persistence of sproxy delete key will
|
// TODO: The method of persistence of sproxy delete key will
|
||||||
// be finalized; refer Issue #312 for the discussion. In the
|
// be finalized; refer Issue #312 for the discussion. In the
|
||||||
// meantime, we at least log the location of the data we are
|
// meantime, we at least log the location of the data we are
|
||||||
// about to delete before attempting its deletion.
|
// about to delete before attempting its deletion.
|
||||||
if (_shouldSkipDelete(locations, requestMethod, newObjDataStoreName)) {
|
if (_shouldSkipDelete(locations, requestMethod, newObjDataStoreName)) {
|
||||||
return;
|
return process.nextTick(cb);
|
||||||
}
|
}
|
||||||
log.trace('initiating batch delete', {
|
log.trace('initiating batch delete', {
|
||||||
keys: locations,
|
keys: locations,
|
||||||
implName,
|
implName,
|
||||||
method: 'batchDelete',
|
method: 'batchDelete',
|
||||||
});
|
});
|
||||||
async.eachLimit(locations, 5, (loc, next) => {
|
const keys = [];
|
||||||
|
const shouldBatchDelete = locations.every(l => {
|
||||||
|
if (typeof l === 'string') {
|
||||||
|
keys.push(l);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (l.dataStoreName === 'sproxyd') {
|
||||||
|
keys.push(l.key);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
if (shouldBatchDelete) {
|
||||||
|
return client.batchDelete('sproxyd', keys, log, cb);
|
||||||
|
}
|
||||||
|
return async.eachLimit(locations, 5, (loc, next) => {
|
||||||
process.nextTick(() => data.delete(loc, log, next));
|
process.nextTick(() => data.delete(loc, log, next));
|
||||||
},
|
},
|
||||||
err => {
|
err => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('batch delete failed', { error: err });
|
log.end().error('batch delete failed', { error: err });
|
||||||
} else {
|
return cb(err);
|
||||||
log.trace('batch delete successfully completed');
|
|
||||||
}
|
}
|
||||||
log.end();
|
log.end().trace('batch delete successfully completed');
|
||||||
|
return cb();
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|
|
@ -250,16 +250,21 @@ const services = {
|
||||||
if (err) {
|
if (err) {
|
||||||
return cb(err, res);
|
return cb(err, res);
|
||||||
}
|
}
|
||||||
cb(null, res); // this is smart
|
|
||||||
log.trace('deleteObject: metadata delete OK');
|
log.trace('deleteObject: metadata delete OK');
|
||||||
const deleteLog = logger.newRequestLogger();
|
const deleteLog = logger.newRequestLogger();
|
||||||
if (objectMD.location === null) {
|
if (objectMD.location === null) {
|
||||||
return undefined;
|
return cb(null, res);
|
||||||
} else if (!Array.isArray(objectMD.location)) {
|
} else if (!Array.isArray(objectMD.location)) {
|
||||||
return data.delete(objectMD.location, deleteLog);
|
data.delete(objectMD.location, deleteLog);
|
||||||
|
return cb(null, res);
|
||||||
}
|
}
|
||||||
return data.batchDelete(objectMD.location, null, null,
|
return data.batchDelete(objectMD.location, null, null,
|
||||||
deleteLog);
|
deleteLog, err => {
|
||||||
|
if (err) {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
return cb(null, res);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@
|
||||||
"mongodb": "^2.2.31",
|
"mongodb": "^2.2.31",
|
||||||
"node-uuid": "^1.4.3",
|
"node-uuid": "^1.4.3",
|
||||||
"npm-run-all": "~4.1.5",
|
"npm-run-all": "~4.1.5",
|
||||||
"sproxydclient": "scality/sproxydclient#6a391f8d",
|
"sproxydclient": "scality/sproxydclient#a6ec980",
|
||||||
"utapi": "scality/utapi#178666f",
|
"utapi": "scality/utapi#178666f",
|
||||||
"utf8": "~2.1.1",
|
"utf8": "~2.1.1",
|
||||||
"uuid": "^3.0.1",
|
"uuid": "^3.0.1",
|
||||||
|
|
Loading…
Reference in New Issue