Compare commits
5 Commits
developmen
...
bugfix/tes
Author | SHA1 | Date |
---|---|---|
Rahul Padigela | ac06057be8 | |
bert-e | 46ed7e1191 | |
bert-e | c22ceed180 | |
Rahul Padigela | 5ebf5cebdd | |
Rahul Padigela | 39bb67d16e |
|
@ -22,8 +22,8 @@ if (config.backends.data === 'file' ||
|
|||
port: config.dataDaemon.port,
|
||||
dataStore: new arsenal.storage.data.file.DataFileStore(
|
||||
{ dataPath: config.dataDaemon.dataPath,
|
||||
log: config.log }),
|
||||
log: config.log });
|
||||
log: { logLevel: 'trace', dumpLevel: 'error' } }),
|
||||
log: { logLevel: 'trace', dumpLevel: 'error' } });
|
||||
dataServer.setup(err => {
|
||||
if (err) {
|
||||
logger.error('Error initializing REST data server',
|
||||
|
|
|
@ -56,8 +56,8 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
|||
if (dataToDelete) {
|
||||
const newDataStoreName = Array.isArray(dataGetInfo) ?
|
||||
dataGetInfo[0].dataStoreName : null;
|
||||
data.batchDelete(dataToDelete, requestMethod,
|
||||
newDataStoreName, deleteLog);
|
||||
return data.batchDelete(dataToDelete, requestMethod,
|
||||
newDataStoreName, deleteLog, err => callback(err, result));
|
||||
}
|
||||
return callback(null, result);
|
||||
});
|
||||
|
|
|
@ -27,8 +27,14 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
|||
});
|
||||
const dataToDelete = [];
|
||||
dataToDelete.push(dataRetrievalInfo);
|
||||
data.batchDelete(dataToDelete, null, null, log);
|
||||
return data.batchDelete(dataToDelete, null, null, log, err => {
|
||||
if (err) {
|
||||
// failure of batch delete is only logged, client gets the
|
||||
// error code about the md mismatch
|
||||
log.error('error deleting old data', { error: err });
|
||||
}
|
||||
return cb(errors.BadDigest);
|
||||
});
|
||||
}
|
||||
return cb(null, dataRetrievalInfo, completedHash);
|
||||
}
|
||||
|
|
|
@ -363,10 +363,19 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
|||
Array.isArray(dataLocations) && dataLocations[0] ?
|
||||
dataLocations[0].dataStoreName : null;
|
||||
if (sanityCheckPassed) {
|
||||
data.batchDelete(dataToDelete, request.method,
|
||||
newDataStoreName,
|
||||
const delLog =
|
||||
logger.newRequestLoggerFromSerializedUids(log
|
||||
.getSerializedUids()));
|
||||
.getSerializedUids());
|
||||
return data.batchDelete(dataToDelete,
|
||||
request.method,
|
||||
newDataStoreName, delLog, err => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
return next(null, mpuBucket, keysToDelete,
|
||||
aggregateETag, extraPartLocations,
|
||||
destinationBucket, generatedVersionId);
|
||||
});
|
||||
}
|
||||
}
|
||||
return next(null, mpuBucket, keysToDelete, aggregateETag,
|
||||
|
@ -377,13 +386,25 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
|||
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
|
||||
extraPartLocations, destinationBucket, generatedVersionId, next) {
|
||||
services.batchDeleteObjectMetadata(mpuBucket.getName(),
|
||||
keysToDelete, log, err => next(err, destinationBucket,
|
||||
aggregateETag, generatedVersionId));
|
||||
keysToDelete, log, err => next(err, extraPartLocations,
|
||||
destinationBucket, aggregateETag, generatedVersionId));
|
||||
},
|
||||
function batchDeleteExtraParts(extraPartLocations, destinationBucket,
|
||||
aggregateETag, generatedVersionId, next) {
|
||||
if (extraPartLocations && extraPartLocations.length > 0) {
|
||||
data.batchDelete(extraPartLocations, request.method, null,
|
||||
logger.newRequestLoggerFromSerializedUids(log
|
||||
.getSerializedUids()));
|
||||
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
return data.batchDelete(extraPartLocations, request.method,
|
||||
null, delLog, err => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
return next(null, destinationBucket, aggregateETag,
|
||||
generatedVersionId);
|
||||
});
|
||||
}
|
||||
return next(null, destinationBucket, aggregateETag,
|
||||
generatedVersionId);
|
||||
},
|
||||
], (err, destinationBucket, aggregateETag, generatedVersionId) => {
|
||||
const resHeaders =
|
||||
|
|
|
@ -423,25 +423,43 @@ function objectCopy(authInfo, request, sourceBucket,
|
|||
log.debug('error storing new metadata', { error: err });
|
||||
return next(err, null, destBucketMD);
|
||||
}
|
||||
const sourceObjSize = storeMetadataParams.size;
|
||||
const destObjPrevSize = (destObjMD &&
|
||||
destObjMD['content-length'] !== undefined) ?
|
||||
destObjMD['content-length'] : null;
|
||||
return next(null, dataToDelete, result, destBucketMD,
|
||||
storeMetadataParams, serverSideEncryption,
|
||||
sourceObjSize, destObjPrevSize);
|
||||
});
|
||||
},
|
||||
function deleteExistingData(dataToDelete, storingNewMdResult,
|
||||
destBucketMD, storeMetadataParams, serverSideEncryption,
|
||||
sourceObjSize, destObjPrevSize, next) {
|
||||
// Clean up any potential orphans in data if object
|
||||
// put is an overwrite of already existing
|
||||
// object with same name, so long as the source is not
|
||||
// the same as the destination
|
||||
if (!sourceIsDestination && dataToDelete) {
|
||||
const newDataStoreName =
|
||||
storeMetadataParams.dataStoreName;
|
||||
data.batchDelete(dataToDelete, request.method,
|
||||
newDataStoreName,
|
||||
logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids()));
|
||||
const newDataStoreName = storeMetadataParams.dataStoreName;
|
||||
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
return data.batchDelete(dataToDelete, request.method,
|
||||
newDataStoreName, delLog, err => {
|
||||
if (err) {
|
||||
// if error, log the error and move on as it is not
|
||||
// relevant to the client as the client's
|
||||
// object already succeeded putting data, metadata
|
||||
log.error('error deleting existing data',
|
||||
{ error: err });
|
||||
}
|
||||
const sourceObjSize = storeMetadataParams.size;
|
||||
const destObjPrevSize = (destObjMD &&
|
||||
destObjMD['content-length'] !== undefined) ?
|
||||
destObjMD['content-length'] : null;
|
||||
return next(null, result, destBucketMD, storeMetadataParams,
|
||||
next(null,
|
||||
storingNewMdResult, destBucketMD, storeMetadataParams,
|
||||
serverSideEncryption, sourceObjSize, destObjPrevSize);
|
||||
});
|
||||
}
|
||||
return next(null,
|
||||
storingNewMdResult, destBucketMD, storeMetadataParams,
|
||||
serverSideEncryption, sourceObjSize, destObjPrevSize);
|
||||
},
|
||||
], (err, storingNewMdResult, destBucketMD, storeMetadataParams,
|
||||
serverSideEncryption, sourceObjSize, destObjPrevSize) => {
|
||||
|
|
|
@ -297,17 +297,36 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
{ error: err, method: 'storeNewPartMetadata' });
|
||||
return next(err);
|
||||
}
|
||||
return next(null, oldLocations, destBucketMD, totalHash,
|
||||
lastModified, sourceVerId, serverSideEncryption,
|
||||
prevObjectSize, copyObjectSize);
|
||||
});
|
||||
},
|
||||
function cleanupExistingData(oldLocations, destBucketMD, totalHash,
|
||||
lastModified, sourceVerId, serverSideEncryption,
|
||||
prevObjectSize, copyObjectSize, next) {
|
||||
// Clean up the old data now that new metadata (with new
|
||||
// data locations) has been stored
|
||||
if (oldLocations) {
|
||||
data.batchDelete(oldLocations, request.method, null,
|
||||
logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids()));
|
||||
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
return data.batchDelete(oldLocations, request.method, null,
|
||||
delLog, err => {
|
||||
if (err) {
|
||||
// if error, log the error and move on as it is not
|
||||
// relevant to the client as the client's
|
||||
// object already succeeded putting data, metadata
|
||||
log.error('error deleting existing data',
|
||||
{ error: err });
|
||||
}
|
||||
return next(null, destBucketMD, totalHash, lastModified,
|
||||
sourceVerId, serverSideEncryption, prevObjectSize,
|
||||
copyObjectSize);
|
||||
return next(null, destBucketMD, totalHash,
|
||||
lastModified, sourceVerId, serverSideEncryption,
|
||||
prevObjectSize, copyObjectSize);
|
||||
});
|
||||
}
|
||||
return next(null, destBucketMD, totalHash,
|
||||
lastModified, sourceVerId, serverSideEncryption,
|
||||
prevObjectSize, copyObjectSize);
|
||||
},
|
||||
], (err, destBucketMD, totalHash, lastModified, sourceVerId,
|
||||
serverSideEncryption, prevObjectSize, copyObjectSize) => {
|
||||
|
|
|
@ -324,18 +324,33 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
});
|
||||
return next(err, destinationBucket);
|
||||
}
|
||||
return next(null, oldLocations, objectLocationConstraint,
|
||||
destinationBucket, hexDigest, prevObjectSize);
|
||||
});
|
||||
},
|
||||
// Clean up any old data now that new metadata (with new
|
||||
// data locations) has been stored.
|
||||
(oldLocations, objectLocationConstraint, destinationBucket, hexDigest,
|
||||
prevObjectSize, next) => {
|
||||
if (oldLocations) {
|
||||
log.trace('Overwriting MPU part, deleting data');
|
||||
data.batchDelete(oldLocations, request.method,
|
||||
objectLocationConstraint,
|
||||
logger.newRequestLoggerFromSerializedUids(log
|
||||
.getSerializedUids()));
|
||||
log.trace('overwriting mpu part, deleting data');
|
||||
const delLog = logger.newRequestLoggerFromSerializedUids(
|
||||
log.getSerializedUids());
|
||||
return data.batchDelete(oldLocations, request.method,
|
||||
objectLocationConstraint, delLog, err => {
|
||||
if (err) {
|
||||
// if error, log the error and move on as it is not
|
||||
// relevant to the client as the client's
|
||||
// object already succeeded putting data, metadata
|
||||
log.error('error deleting existing data',
|
||||
{ error: err });
|
||||
}
|
||||
return next(null, destinationBucket,
|
||||
hexDigest, prevObjectSize);
|
||||
return next(null, destinationBucket, hexDigest,
|
||||
prevObjectSize);
|
||||
});
|
||||
}
|
||||
return next(null, destinationBucket, hexDigest,
|
||||
prevObjectSize);
|
||||
},
|
||||
], (err, destinationBucket, hexDigest, prevObjectSize) => {
|
||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||
|
|
|
@ -117,6 +117,14 @@ const multipleBackendGateway = {
|
|||
return client.delete(objectGetInfo, reqUids, callback);
|
||||
},
|
||||
|
||||
batchDelete: (dataStoreName, keys, log, callback) => {
|
||||
const client = clients[dataStoreName];
|
||||
if (client.batchDelete) {
|
||||
return client.batchDelete(keys, log.getSerializedUids(), callback);
|
||||
}
|
||||
return callback(errors.NotImplemented);
|
||||
},
|
||||
|
||||
healthcheck: (flightCheckOnStartUp, log, callback) => {
|
||||
const multBackendResp = {};
|
||||
const awsArray = [];
|
||||
|
|
|
@ -256,31 +256,45 @@ const data = {
|
|||
return callback(err);
|
||||
});
|
||||
},
|
||||
// It would be preferable to have an sproxyd batch delete route to
|
||||
// replace this
|
||||
batchDelete: (locations, requestMethod, newObjDataStoreName, log) => {
|
||||
|
||||
batchDelete: (locations, requestMethod, newObjDataStoreName, log, cb) => {
|
||||
// TODO: The method of persistence of sproxy delete key will
|
||||
// be finalized; refer Issue #312 for the discussion. In the
|
||||
// meantime, we at least log the location of the data we are
|
||||
// about to delete before attempting its deletion.
|
||||
if (_shouldSkipDelete(locations, requestMethod, newObjDataStoreName)) {
|
||||
return;
|
||||
return process.nextTick(cb);
|
||||
}
|
||||
log.trace('initiating batch delete', {
|
||||
keys: locations,
|
||||
implName,
|
||||
method: 'batchDelete',
|
||||
});
|
||||
async.eachLimit(locations, 5, (loc, next) => {
|
||||
const keys = [];
|
||||
const shouldBatchDelete = locations.every(l => {
|
||||
if (typeof l === 'string') {
|
||||
keys.push(l);
|
||||
return true;
|
||||
}
|
||||
if (l.dataStoreName === 'sproxyd') {
|
||||
keys.push(l.key);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
if (shouldBatchDelete) {
|
||||
return client.batchDelete('sproxyd', keys, log, cb);
|
||||
}
|
||||
return async.eachLimit(locations, 5, (loc, next) => {
|
||||
process.nextTick(() => data.delete(loc, log, next));
|
||||
},
|
||||
err => {
|
||||
if (err) {
|
||||
log.error('batch delete failed', { error: err });
|
||||
} else {
|
||||
log.trace('batch delete successfully completed');
|
||||
log.end().error('batch delete failed', { error: err });
|
||||
return cb(err);
|
||||
}
|
||||
log.end();
|
||||
log.end().trace('batch delete successfully completed');
|
||||
return cb();
|
||||
});
|
||||
},
|
||||
|
||||
|
|
|
@ -250,16 +250,21 @@ const services = {
|
|||
if (err) {
|
||||
return cb(err, res);
|
||||
}
|
||||
cb(null, res); // this is smart
|
||||
log.trace('deleteObject: metadata delete OK');
|
||||
const deleteLog = logger.newRequestLogger();
|
||||
if (objectMD.location === null) {
|
||||
return undefined;
|
||||
return cb(null, res);
|
||||
} else if (!Array.isArray(objectMD.location)) {
|
||||
return data.delete(objectMD.location, deleteLog);
|
||||
data.delete(objectMD.location, deleteLog);
|
||||
return cb(null, res);
|
||||
}
|
||||
return data.batchDelete(objectMD.location, null, null,
|
||||
deleteLog);
|
||||
deleteLog, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, res);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@
|
|||
"mongodb": "^2.2.31",
|
||||
"node-uuid": "^1.4.3",
|
||||
"npm-run-all": "~4.1.5",
|
||||
"sproxydclient": "scality/sproxydclient#6a391f8d",
|
||||
"sproxydclient": "scality/sproxydclient#a6ec980",
|
||||
"utapi": "scality/utapi#178666f",
|
||||
"utf8": "~2.1.1",
|
||||
"uuid": "^3.0.1",
|
||||
|
|
Loading…
Reference in New Issue