Compare commits

...

4 Commits

Author SHA1 Message Date
Jordi Bertran de Balanda 11e1cccd14 CLDSRV-174 - update Arsenal 2022-05-05 21:36:35 +02:00
Ronnie Smith 97fca50504
feature: CLDSRV-162 use data retrieval params over fn 2022-04-27 10:22:37 -07:00
Ronnie Smith 5a6b01c4d5
feature: CLDSRV-162 update bad version ids to be proper 2022-04-26 13:15:07 -07:00
Ronnie Smith fdb2511b81
feature: CLDSRV-162 use metadata and data from new arsenal 2022-04-26 00:11:40 -07:00
131 changed files with 1319 additions and 6096 deletions

View File

@ -7,7 +7,7 @@ COPY . /usr/src/app
RUN apt-get update \
&& apt-get install -y jq python git build-essential --no-install-recommends \
&& yarn install --production --network-concurrency 1 \
&& yarn install --production \
&& apt-get autoremove --purge -y python git build-essential \
&& rm -rf /var/lib/apt/lists/* \
&& yarn cache clean \

View File

@ -174,10 +174,6 @@ const constants = {
'bucket',
],
allowedUtapiEventFilterStates: ['allow', 'deny'],
// The AWS assumed Role resource type
assumedRoleArnResourceType: 'assumed-role',
// Session name of the backbeat lifecycle assumed role session.
backbeatLifecycleSessionName: 'backbeat-lifecycle',
};
module.exports = constants;

View File

@ -24,9 +24,60 @@ const versionIdUtils = versioning.VersionID;
const defaultHealthChecks = { allowFrom: ['127.0.0.1/8', '::1'] };
const defaultLocalCache = { host: '127.0.0.1', port: 6379 };
const defaultExternalBackendsConfig = {
// eslint-disable-next-line camelcase
aws_s3: {
httpAgent: {
keepAlive: false,
keepAliveMsecs: 1000,
maxFreeSockets: 256,
maxSockets: null,
},
},
gcp: {
httpAgent: {
keepAlive: true,
keepAliveMsecs: 1000,
maxFreeSockets: 256,
maxSockets: null,
},
},
};
const gcpScope = 'https://www.googleapis.com/auth/cloud-platform';
function assertCertPaths(key, cert, ca, basePath) {
const certObj = {};
certObj.paths = {};
certObj.certs = {};
if (key) {
const keypath = key.startsWith('/') ? key : `${basePath}/${key}`;
assert.doesNotThrow(() =>
fs.accessSync(keypath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${keypath}`);
certObj.paths.key = keypath;
certObj.certs.key = fs.readFileSync(keypath, 'ascii');
}
if (cert) {
const certpath = cert.startsWith('/') ? cert : `${basePath}/${cert}`;
assert.doesNotThrow(() =>
fs.accessSync(certpath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${certpath}`);
certObj.paths.cert = certpath;
certObj.certs.cert = fs.readFileSync(certpath, 'ascii');
}
if (ca) {
const capath = ca.startsWith('/') ? ca : `${basePath}/${ca}`;
assert.doesNotThrow(() =>
fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`);
certObj.paths.ca = capath;
certObj.certs.ca = fs.readFileSync(capath, 'ascii');
}
return certObj;
}
function parseSproxydConfig(configSproxyd) {
const joiSchema = joi.object({
bootstrap: joi.array().items(joi.string()).min(1),
@ -1124,6 +1175,58 @@ class Config extends EventEmitter {
'certFilePaths.cert must be defined');
}
this.outboundProxy = {};
const envProxy = process.env.HTTP_PROXY || process.env.HTTPS_PROXY
|| process.env.http_proxy || process.env.https_proxy;
const p = config.outboundProxy;
const proxyUrl = envProxy || (p ? p.url : '');
if (proxyUrl) {
assert(typeof proxyUrl === 'string',
'bad proxy config: url must be a string');
const { protocol, hostname, port, auth } = url.parse(proxyUrl);
assert(protocol === 'http:' || protocol === 'https:',
'bad proxy config: protocol must be http or https');
assert(typeof hostname === 'string' && hostname !== '',
'bad proxy config: hostname must be a non-empty string');
if (port) {
const portInt = Number.parseInt(port, 10);
assert(!Number.isNaN(portInt) && portInt > 0,
'bad proxy config: port must be a number greater than 0');
}
if (auth) {
assert(typeof auth === 'string',
'bad proxy config: auth must be string');
const authArray = auth.split(':');
assert(authArray.length === 2 && authArray[0].length > 0
&& authArray[1].length > 0, 'bad proxy config: ' +
'auth must be of format username:password');
}
this.outboundProxy.url = proxyUrl;
this.outboundProxy.certs = {};
const envCert = process.env.HTTPS_PROXY_CERTIFICATE;
const key = p ? p.key : '';
const cert = p ? p.cert : '';
const caBundle = envCert || (p ? p.caBundle : '');
if (p) {
assert(typeof p === 'object',
'bad config: "proxy" should be an object');
}
if (key) {
assert(typeof key === 'string',
'bad config: proxy.key should be a string');
}
if (cert) {
assert(typeof cert === 'string',
'bad config: proxy.cert should be a string');
}
if (caBundle) {
assert(typeof caBundle === 'string',
'bad config: proxy.caBundle should be a string');
}
const certObj = assertCertPaths(key, cert, caBundle, this._basePath);
this.outboundProxy.certs = certObj.certs;
}
// Ephemeral token to protect the reporting endpoint:
// try inherited from parent first, then hardcoded in conf file,
// then create a fresh one as last resort.
@ -1132,6 +1235,40 @@ class Config extends EventEmitter {
config.reportToken ||
uuidv4();
// External backends
// Currently supports configuring httpAgent(s) for keepAlive
this.externalBackends = defaultExternalBackendsConfig;
if (config.externalBackends) {
const extBackendsConfig = Object.keys(config.externalBackends);
extBackendsConfig.forEach(b => {
// assert that it's a valid backend
assert(externalBackends[b] !== undefined,
`bad config: ${b} is not one of valid external backends: ` +
`${Object.keys(externalBackends).join(', ')}`);
const { httpAgent } = config.externalBackends[b];
assert(typeof httpAgent === 'object',
`bad config: ${b} must have httpAgent object defined`);
const { keepAlive, keepAliveMsecs, maxFreeSockets, maxSockets }
= httpAgent;
assert(typeof keepAlive === 'boolean',
`bad config: ${b}.httpAgent.keepAlive must be a boolean`);
assert(typeof keepAliveMsecs === 'number' &&
httpAgent.keepAliveMsecs > 0,
`bad config: ${b}.httpAgent.keepAliveMsecs must be` +
' a number > 0');
assert(typeof maxFreeSockets === 'number' &&
httpAgent.maxFreeSockets >= 0,
`bad config: ${b}.httpAgent.maxFreeSockets must be ` +
'a number >= 0');
assert((typeof maxSockets === 'number' && maxSockets >= 0) ||
maxSockets === null,
`bad config: ${b}.httpAgent.maxFreeSockets must be ` +
'null or a number >= 0');
Object.assign(this.externalBackends[b].httpAgent, httpAgent);
});
}
// requests-proxy configuration
this.requests = {
viaProxy: false,

View File

@ -1,8 +1,7 @@
const { evaluators, actionMaps, RequestContext } = require('arsenal').policies;
const constants = require('../../../../constants');
const { allAuthedUsersId, bucketOwnerActions, logId, publicId,
assumedRoleArnResourceType, backbeatLifecycleSessionName } = constants;
const { allAuthedUsersId, bucketOwnerActions, logId, publicId } = constants;
// whitelist buckets to allow public read on objects
const publicReadBuckets = process.env.ALLOW_PUBLIC_READ_BUCKETS ?
@ -365,34 +364,10 @@ function validatePolicyResource(bucketName, policy) {
});
}
/** isLifecycleSession - check if it is the Lifecycle assumed role session arn.
* @param {string} arn - Amazon resource name - example:
* arn:aws:sts::257038443293:assumed-role/rolename/backbeat-lifecycle
* @return {boolean} true if Lifecycle assumed role session arn, false if not.
*/
function isLifecycleSession(arn) {
if (!arn) {
return false;
}
const arnSplits = arn.split(':');
const service = arnSplits[2];
const resourceNames = arnSplits[arnSplits.length - 1].split('/');
const resourceType = resourceNames[0];
const sessionName = resourceNames[resourceNames.length - 1];
return (service === 'sts' &&
resourceType === assumedRoleArnResourceType &&
sessionName === backbeatLifecycleSessionName);
}
module.exports = {
isBucketAuthorized,
isObjAuthorized,
checkBucketAcls,
checkObjectAcls,
validatePolicyResource,
isLifecycleSession,
};

View File

@ -49,7 +49,7 @@ function updateRequestContexts(request, requestContexts, apiMethod, log, cb) {
return metadata.getObjectMD(bucketName, objectKey, { versionId: reqVersionId }, log,
(err, objMD) => {
if (err) {
if (err.NoSuchKey) {
if (err.is.NoSuchKey) {
return next();
}
log.trace('error getting request object tags');

View File

@ -22,7 +22,7 @@ function addToUsersBucket(canonicalID, bucketName, log, cb) {
// Get new format usersBucket to see if it exists
return metadata.getBucket(usersBucket, log, (err, usersBucketAttrs) => {
if (err && !err.NoSuchBucket && !err.BucketAlreadyExists) {
if (err && !err.is.NoSuchBucket && !err.is.BucketAlreadyExists) {
return cb(err);
}
const splitter = usersBucketAttrs ?
@ -36,7 +36,7 @@ function addToUsersBucket(canonicalID, bucketName, log, cb) {
usersBucket : oldUsersBucket;
return metadata.putObjectMD(usersBucketBeingCalled, key,
omVal, {}, log, err => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
// There must be no usersBucket so createBucket
// one using the new format
log.trace('users bucket does not exist, ' +
@ -56,9 +56,7 @@ function addToUsersBucket(canonicalID, bucketName, log, cb) {
// from getting a BucketAlreadyExists
// error with respect
// to the usersBucket.
if (err &&
err !==
errors.BucketAlreadyExists) {
if (err && !err.is.BucketAlreadyExists) {
log.error('error from metadata', {
error: err,
});
@ -206,7 +204,7 @@ function createBucket(authInfo, bucketName, headers,
},
getAnyExistingBucketInfo: function getAnyExistingBucketInfo(callback) {
metadata.getBucket(bucketName, log, (err, data) => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
return callback(null, 'NoBucketYet');
}
if (err) {

View File

@ -16,7 +16,7 @@ function _deleteMPUbucket(destinationBucketName, log, cb) {
`${mpuBucketPrefix}${destinationBucketName}`;
return metadata.deleteBucket(mpuBucketName, log, err => {
// If the mpu bucket does not exist, just move on
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
return cb();
}
return cb(err);
@ -90,7 +90,7 @@ function deleteBucket(authInfo, bucketMD, bucketName, canonicalID, log, cb) {
log, (err, objectsListRes) => {
// If no shadow bucket ever created, no ongoing MPU's, so
// continue with deletion
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
return next();
}
if (err) {

View File

@ -11,7 +11,7 @@ function deleteUserBucketEntry(bucketName, canonicalID, log, cb) {
metadata.deleteObjectMD(usersBucket, keyForUserBucket, {}, log, error => {
// If the object representing the bucket is not in the
// users bucket just continue
if (error && error.NoSuchKey) {
if (error && error.is.NoSuchKey) {
return cb(null);
// BACKWARDS COMPATIBILITY: Remove this once no longer
// have old user bucket format
@ -20,7 +20,7 @@ function deleteUserBucketEntry(bucketName, canonicalID, log, cb) {
oldSplitter, bucketName);
return metadata.deleteObjectMD(oldUsersBucket, keyForUserBucket2,
{}, log, error => {
if (error && !error.NoSuchKey) {
if (error && !error.is.NoSuchKey) {
log.error('from metadata while deleting user bucket',
{ error });
return cb(error);

View File

@ -1,12 +1,10 @@
const async = require('async');
const { config } = require('../../../Config');
const constants = require('../../../../constants');
const data = require('../../../data/wrapper');
const { data } = require('../../../data/wrapper');
const locationConstraintCheck = require('../object/locationConstraintCheck');
const { metadataValidateBucketAndObj } =
require('../../../metadata/metadataUtils');
const multipleBackendGateway = require('../../../data/multipleBackendGateway');
const services = require('../../../services');
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
@ -55,35 +53,19 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
return next(err, mpuBucket, mpuOverviewObj, destBucket);
});
},
function ifMultipleBackend(mpuBucket, mpuOverviewObj, destBucket,
function abortExternalMpu(mpuBucket, mpuOverviewObj, destBucket,
next) {
if (config.backends.data === 'multiple') {
let location;
// if controlling location constraint is not stored in object
// metadata, mpu was initiated in legacy S3C, so need to
// determine correct location constraint
if (!mpuOverviewObj.controllingLocationConstraint) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err, destBucket);
});
}
location = backendInfoObj.controllingLC;
} else {
location = mpuOverviewObj.controllingLocationConstraint;
}
return multipleBackendGateway.abortMPU(objectKey, uploadId,
location, bucketName, log, (err, skipDataDelete) => {
const location = mpuOverviewObj.controllingLocationConstraint;
return data.abortMPU(objectKey, uploadId, location, bucketName,
request, destBucket, locationConstraintCheck, log,
(err, skipDataDelete) => {
if (err) {
return next(err, destBucket);
}
return next(null, mpuBucket, destBucket,
skipDataDelete);
// for Azure and GCP we do not need to delete data
// for all other backends, skipDataDelete will be set to false
return next(null, mpuBucket, destBucket, skipDataDelete);
});
}
return next(null, mpuBucket, destBucket, false);
},
function sendAbortPut(mpuBucket, destBucket, skipDataDelete, next) {
services.sendAbortMPUPut(bucketName, objectKey, uploadId, log,

View File

@ -3,7 +3,7 @@ const { errors, s3middleware } = require('arsenal');
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
const constants = require('../../../../constants');
const data = require('../../../data/wrapper');
const { data } = require('../../../data/wrapper');
const services = require('../../../services');
const logger = require('../../../utilities/logger');
const { dataStore } = require('./storeObject');
@ -136,10 +136,9 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
size,
headers,
isDeleteMarker,
replicationInfo: getReplicationInfo(objectKey, bucketMD, false, size, null, null, authInfo, isDeleteMarker),
replicationInfo: getReplicationInfo(objectKey, bucketMD, false, size),
log,
};
if (!isDeleteMarker) {
metadataStoreParams.contentType = request.headers['content-type'];
metadataStoreParams.cacheControl = request.headers['cache-control'];
@ -239,7 +238,7 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
if (err) {
// TODO: check AWS error when user requested a specific
// version before any versions have been put
const logLvl = err === errors.BadRequest ?
const logLvl = err.is.BadRequest ?
'debug' : 'error';
log[logLvl]('error getting versioning info', {
error: err,

View File

@ -1,5 +1,4 @@
const s3config = require('../../../Config').config;
const { isLifecycleSession } = require('../authorization/permissionChecks.js');
function _getBackend(objectMD, site) {
const backends = objectMD ? objectMD.replicationInfo.backends : [];
@ -64,22 +63,14 @@ function _getReplicationInfo(rule, replicationConfig, content, operationType,
* @param {boolean} objSize - The size, in bytes, of the object being PUT
* @param {string} operationType - The type of operation to replicate
* @param {object} objectMD - The object metadata
* @param {AuthInfo} [authInfo] - authentication info of object owner
* @param {boolean} [isDeleteMarker] - whether creating a delete marker
* @return {undefined}
*/
function getReplicationInfo(objKey, bucketMD, isMD, objSize, operationType,
objectMD, authInfo, isDeleteMarker) {
objectMD) {
const content = isMD || objSize === 0 ? ['METADATA'] : ['DATA', 'METADATA'];
const config = bucketMD.getReplicationConfiguration();
// If bucket does not have a replication configuration, do not replicate.
if (config) {
// If delete an object due to a lifecycle action,
// the delete marker is not replicated to the destination buckets.
if (isDeleteMarker && authInfo && isLifecycleSession(authInfo.getArn())) {
return undefined;
}
const rule = config.rules.find(rule =>
(objKey.startsWith(rule.prefix) && rule.enabled));
if (rule) {

View File

@ -8,13 +8,12 @@
*
* @param {array|string|null} prev - list of keys from the object being
* overwritten
* @param {array|null} curr - list of keys to be used in composing
* current object
* @param {array} curr - list of keys to be used in composing current object
* @returns {boolean} true if no key in `curr` is present in `prev`,
* false otherwise
*/
function locationKeysHaveChanged(prev, curr) {
if (!prev || prev.length === 0 || !curr) {
if (!prev || prev.length === 0) {
return true;
}
// backwards compatibility check if object is of model version 2

View File

@ -0,0 +1,48 @@
const { errors } = require('arsenal');
const { config } = require('../../../Config');
const { getLocationMetric, pushLocationMetric } =
require('../../../utapi/utilities');
function _gbToBytes(gb) {
return gb * 1024 * 1024 * 1024;
}
/**
* locationStorageCheck - will ensure there is enough space left for object on
* PUT operations, or will update metric on DELETE
* NOTE: storage limit may not be exactly enforced in the case of concurrent
* requests when near limit
* @param {string} location - name of location to check quota
* @param {number} updateSize - new size to check against quota in bytes
* @param {object} log - werelogs logger
* @param {function} cb - callback function
* @return {undefined}
*/
function locationStorageCheck(location, updateSize, log, cb) {
const lc = config.locationConstraints;
const sizeLimitGB = lc[location] ? lc[location].sizeLimitGB : undefined;
if (updateSize === 0 || sizeLimitGB === undefined || sizeLimitGB === null) {
return cb();
}
// no need to list location metric, since it should be decreased
if (updateSize < 0) {
return pushLocationMetric(location, updateSize, log, cb);
}
return getLocationMetric(location, log, (err, bytesStored) => {
if (err) {
log.error(`Error listing metrics from Utapi: ${err.message}`);
return cb(err);
}
const newStorageSize = parseInt(bytesStored, 10) + updateSize;
const sizeLimitBytes = _gbToBytes(sizeLimitGB);
if (sizeLimitBytes < newStorageSize) {
return cb(errors.AccessDenied.customizeDescription(
`The assigned storage space limit for location ${location} ` +
'will be exceeded'));
}
return pushLocationMetric(location, updateSize, log, cb);
});
}
module.exports = locationStorageCheck;

View File

@ -2,7 +2,7 @@ const { errors } = require('arsenal');
const {
parseRangeSpec,
parseRange,
} = require('arsenal/lib/network/http/utils');
} = require('arsenal').network.http.utils;
const constants = require('../../../../constants');
const setPartRanges = require('./setPartRanges');

View File

@ -1,6 +1,6 @@
const { errors, jsutil } = require('arsenal');
const data = require('../../../data/wrapper');
const { data } = require('../../../data/wrapper');
const { prepareStream } = require('./prepareStream');
/**

View File

@ -292,7 +292,7 @@ function versioningPreprocessing(bucketName, bucketMD, objectKey, objMD,
// it's possible there was a concurrent request to
// delete the null version, so proceed with putting a
// new version
if (err === errors.NoSuchKey) {
if (err.is.NoSuchKey) {
return next(null, options);
}
return next(errors.InternalError);

View File

@ -8,16 +8,15 @@ const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const { validateAndFilterMpuParts, generateMpuPartStorageInfo } =
require('./apiUtils/object/processMpuParts');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const constants = require('../../constants');
const { versioningPreprocessing, checkQueryVersionId }
= require('./apiUtils/object/versioning');
const services = require('../services');
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const { skipMpuPartProcessing } = require('../data/external/utils');
const { skipMpuPartProcessing } = require('arsenal').storage.data.external.backendUtils;
const locationConstraintCheck
= require('./apiUtils/object/locationConstraintCheck');
const locationKeysHaveChanged
@ -158,22 +157,6 @@ function completeMultipartUpload(authInfo, request, log, callback) {
}
return next(errors.MalformedXML, destBucket);
},
function markOverviewForCompletion(destBucket, objMD, mpuBucket, jsonList,
storedMetadata, location, mpuOverviewKey, next) {
return services.metadataMarkMPObjectForCompletion({
bucketName: mpuBucket.getName(),
objectKey,
uploadId,
splitter,
storedMetadata,
}, log, err => {
if (err) {
return next(err);
}
return next(null, destBucket, objMD, mpuBucket,
jsonList, storedMetadata, location, mpuOverviewKey);
});
},
function retrieveParts(destBucket, objMD, mpuBucket, jsonList,
storedMetadata, location, mpuOverviewKey, next) {
return services.getMPUparts(mpuBucket.getName(), uploadId, log,
@ -186,35 +169,21 @@ function completeMultipartUpload(authInfo, request, log, callback) {
jsonList, storedMetadata, location, mpuOverviewKey);
});
},
function ifMultipleBackend(destBucket, objMD, mpuBucket, storedParts,
function completeExternalMpu(destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, location, mpuOverviewKey, next) {
if (config.backends.data === 'multiple') {
// if mpu was initiated in legacy version
if (location === undefined) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err);
});
}
// eslint-disable-next-line no-param-reassign
location = backendInfoObj.controllingLC;
}
const mdInfo = { storedParts, mpuOverviewKey, splitter };
return multipleBackendGateway.completeMPU(objectKey,
uploadId, location, jsonList, mdInfo, bucketName, null, null,
log, (err, completeObjData) => {
const mpuInfo =
{ objectKey, uploadId, jsonList, bucketName, destBucket };
return data.completeMPU(request, mpuInfo, mdInfo, location,
null, null, null, locationConstraintCheck, log,
(err, completeObjData) => {
if (err) {
return next(err, destBucket);
}
// if mpu not handled externally, completeObjData will be null
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, completeObjData,
mpuOverviewKey);
jsonList, storedMetadata, completeObjData, mpuOverviewKey);
});
}
return next(null, destBucket, objMD, mpuBucket, storedParts,
jsonList, storedMetadata, null, mpuOverviewKey);
},
function validateAndFilterParts(destBucket, objMD, mpuBucket,
storedParts, jsonList, storedMetadata, completeObjData, mpuOverviewKey,
@ -341,8 +310,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
if (err) {
// TODO: check AWS error when user requested a specific
// version before any versions have been put
const logLvl = err === errors.BadRequest ?
'debug' : 'error';
const logLvl = err.is.BadRequest ? 'debug' : 'error';
log[logLvl]('error getting versioning info', {
error: err,
method: 'versioningPreprocessing',

View File

@ -14,15 +14,11 @@ const locationConstraintCheck
= require('./apiUtils/object/locationConstraintCheck');
const validateWebsiteHeader = require('./apiUtils/object/websiteServing')
.validateWebsiteHeader;
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const { validateHeaders, compareObjectLockInformation } =
require('./apiUtils/object/objectLockHelpers');
const { getObjectSSEConfiguration } = require('./apiUtils/bucket/bucketEncryption');
const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
const externalVersioningErrorMessage = 'We do not currently support putting ' +
'a versioned object to a location-constraint of type Azure.';
const { data } = require('../data/wrapper');
/*
Sample xml response:
@ -226,48 +222,35 @@ function initiateMultipartUpload(authInfo, request, log, callback) {
}
let uploadId;
if (config.backends.data === 'multiple') {
return multipleBackendGateway.createMPU(objectKey, metaHeaders,
bucketName, websiteRedirectHeader, locConstraint, undefined,
undefined, undefined, undefined, tagging, log,
(err, dataBackendResObj) => {
const mpuInfo = {
objectKey,
metaHeaders,
bucketName,
locConstraint,
destinationBucket,
tagging,
};
return data.initiateMPU(mpuInfo, websiteRedirectHeader, log,
(err, dataBackendResObj, isVersionedObj) => {
// will return as true and a custom error if external backend does
// not support versioned objects
if (isVersionedObj) {
return callback(err);
}
if (err) {
return callback(err);
}
if (locConstraint &&
config.locationConstraints[locConstraint] &&
config.locationConstraints[locConstraint].type &&
constants.versioningNotImplBackends[config
.locationConstraints[locConstraint].type]
) {
const vcfg = destinationBucket.getVersioningConfiguration();
const isVersionedObj = vcfg && vcfg.Status === 'Enabled';
if (isVersionedObj) {
log.debug(externalVersioningErrorMessage,
{ method: 'initiateMultipartUpload',
error: errors.NotImplemented });
return callback(errors.NotImplemented
.customizeDescription(externalVersioningErrorMessage));
}
}
// if mpu not handled externally, dataBackendResObj will be null
if (dataBackendResObj) {
// dataBackendResObj will be returned in data backend
// handles mpu
uploadId = dataBackendResObj.UploadId;
} else {
// Generate uniqueID without dashes so routing not messed up
uploadId = uuidv4().replace(/-/g, '');
}
return _getMPUBucket(destinationBucket, log, corsHeaders,
uploadId, cipherBundle, locConstraint, callback);
});
}
// Generate uniqueID without dashes so that routing not messed up
uploadId = uuidv4().replace(/-/g, '');
return _getMPUBucket(destinationBucket, log, corsHeaders,
uploadId, cipherBundle, locConstraint, callback);
}
async.waterfall([
next => metadataValidateBucketAndObj(metadataValParams, log,

View File

@ -3,6 +3,7 @@ const async = require('async');
const { errors, s3middleware } = require('arsenal');
const { data } = require('../data/wrapper');
const constants = require('../../constants');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const locationConstraintCheck =
@ -12,8 +13,6 @@ const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const escapeForXml = s3middleware.escapeForXml;
const { pushMetric } = require('../utapi/utilities');
const { config } = require('../../lib/Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
/*
@ -142,37 +141,25 @@ function listParts(authInfo, request, log, callback) {
});
},
function waterfall3(destBucket, mpuBucket, mpuOverviewObj, next) {
if (config.backends.data === 'multiple') {
let location;
// if controlling location constraint is not stored in object
// metadata, mpu was initiated in legacy S3C, so need to
// determine correct location constraint
if (!mpuOverviewObj.controllingLocationConstraint) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err, destBucket);
});
}
location = backendInfoObj.controllingLC;
} else {
location = mpuOverviewObj.controllingLocationConstraint;
}
return multipleBackendGateway.listParts(objectKey, uploadId,
location, bucketName, partNumberMarker, maxParts, log,
(err, backendPartList) => {
const mpuInfo = {
objectKey,
uploadId,
bucketName,
partNumberMarker,
maxParts,
mpuOverviewObj,
destBucket,
};
return data.listParts(mpuInfo, request, locationConstraintCheck,
log, (err, backendPartList) => {
if (err) {
return next(err, destBucket);
} else if (backendPartList) {
return next(null, destBucket, mpuBucket,
mpuOverviewObj, backendPartList);
}
// if external backend doesn't handle mpu, backendPartList
// will be null
return next(null, destBucket, mpuBucket, mpuOverviewObj,
null);
backendPartList);
});
}
return next(null, destBucket, mpuBucket, mpuOverviewObj, null);
},
function waterfall4(destBucket, mpuBucket, mpuOverviewObj,
backendPartList, next) {

View File

@ -210,10 +210,10 @@ function getObjMetadataAndDelete(authInfo, canonicalID, request,
(versionId, callback) => metadataGetObject(bucketName, entry.key,
versionId, log, (err, objMD) => {
// if general error from metadata return error
if (err && !err.NoSuchKey) {
if (err && !err.is.NoSuchKey) {
return callback(err);
}
if (err && err.NoSuchKey) {
if (err && err.is.NoSuchKey) {
const verCfg = bucket.getVersioningConfiguration();
// To adhere to AWS behavior, create a delete marker
// if trying to delete an object that does not exist
@ -386,7 +386,7 @@ function multiObjectDelete(authInfo, request, log, callback) {
return vault.checkPolicies(requestContextParams, authInfo.getArn(),
log, (err, authorizationResults) => {
// there were no policies so received a blanket AccessDenied
if (err && err.AccessDenied) {
if (err && err.is.AccessDenied) {
objects.forEach(entry => {
errorResults.push({
entry,

View File

@ -1,5 +1,3 @@
const { errors } = require('arsenal');
const abortMultipartUpload = require('./apiUtils/object/abortMultipartUpload');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const isLegacyAWSBehavior = require('../utilities/legacyAWSBehavior');
@ -29,10 +27,10 @@ function multipartDelete(authInfo, request, log, callback) {
request.method, destinationBucket);
const location = destinationBucket ?
destinationBucket.getLocationConstraint() : null;
if (err && err !== errors.NoSuchUpload) {
if (err && !err.is.NoSuchUpload) {
return callback(err, corsHeaders);
}
if (err === errors.NoSuchUpload && isLegacyAWSBehavior(location)) {
if (err && err.is.NoSuchUpload && isLegacyAWSBehavior(location)) {
log.trace('did not find valid mpu with uploadId', {
method: 'multipartDelete',
uploadId,

View File

@ -11,7 +11,7 @@ const locationConstraintCheck
const { checkQueryVersionId, versioningPreprocessing }
= require('./apiUtils/object/versioning');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const logger = require('../utilities/logger');
const services = require('../services');
const { pushMetric } = require('../utapi/utilities');

View File

@ -9,8 +9,7 @@ const { pushMetric } = require('../utapi/utilities');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const metadata = require('../metadata/wrapper');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const { data } = require('../data/wrapper');
const REPLICATION_ACTION = 'DELETE_TAGGING';
/**
@ -84,13 +83,10 @@ function objectDeleteTagging(authInfo, request, log, callback) {
log, err =>
next(err, bucket, objectMD));
},
(bucket, objectMD, next) => {
if (config.backends.data === 'multiple') {
return multipleBackendGateway.objectTagging('Delete', objectKey,
bucket, objectMD, log, err => next(err, bucket, objectMD));
}
return next(null, bucket, objectMD);
},
(bucket, objectMD, next) =>
// if external backends handles tagging
data.objectTagging('Delete', objectKey, bucket, objectMD,
log, err => next(err, bucket, objectMD)),
], (err, bucket, objectMD) => {
const additionalResHeaders = collectCorsHeaders(request.headers.origin,
request.method, bucket);

View File

@ -1,7 +1,7 @@
const { errors, s3middleware } = require('arsenal');
const { parseRange } = require('arsenal/lib/network/http/utils');
const { parseRange } = require('arsenal').network.http.utils;
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const { decodeVersionId } = require('./apiUtils/object/versioning');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');

View File

@ -1,6 +1,6 @@
const { errors, s3middleware } = require('arsenal');
const validateHeaders = s3middleware.validateConditionalHeaders;
const { parseRange } = require('arsenal/lib/network/http/utils');
const { parseRange } = require('arsenal').network.http.utils;
const { decodeVersionId } = require('./apiUtils/object/versioning');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
@ -130,12 +130,10 @@ function objectHead(authInfo, request, log, callback) {
return callback(errors.BadRequest, corsHeaders);
}
const partSize = getPartSize(objMD, partNumber);
const isEmptyObject = objLength === 0;
if (!partSize && !isEmptyObject) {
if (!partSize) {
return callback(errors.InvalidRange, corsHeaders);
}
responseHeaders['content-length'] = isEmptyObject ? 0 : partSize;
responseHeaders['content-length'] = partSize;
const partsCount = getPartCountFromMd5(objMD);
if (partsCount) {
responseHeaders['x-amz-mp-parts-count'] = partsCount;

View File

@ -4,7 +4,9 @@ const validateHeaders = s3middleware.validateConditionalHeaders;
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const constants = require('../../constants');
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const locationConstraintCheck =
require('./apiUtils/object/locationConstraintCheck');
const metadata = require('../metadata/wrapper');
const { pushMetric } = require('../utapi/utilities');
const logger = require('../utilities/logger');
@ -182,7 +184,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
sourceLocationConstraintName, next) {
return metadata.getBucket(mpuBucketName, log,
(err, mpuBucket) => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
return next(errors.NoSuchUpload);
}
if (err) {
@ -211,7 +213,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
return metadata.getObjectMD(mpuBucketName, mpuOverviewKey,
null, log, (err, res) => {
if (err) {
if (err.NoSuchKey) {
if (err.is.NoSuchKey) {
return next(errors.NoSuchUpload);
}
log.error('error getting overview object from ' +
@ -232,38 +234,50 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
res.controllingLocationConstraint;
return next(null, dataLocator, destBucketMD,
destObjLocationConstraint, copyObjectSize,
sourceVerId, sourceLocationConstraintName, splitter);
sourceVerId, sourceLocationConstraintName);
});
},
function goGetData(dataLocator, destBucketMD,
destObjLocationConstraint, copyObjectSize, sourceVerId,
sourceLocationConstraintName, splitter, next) {
data.uploadPartCopy(request, log, destBucketMD,
function goGetData(
dataLocator,
destBucketMD,
destObjLocationConstraint,
copyObjectSize,
sourceVerId,
sourceLocationConstraintName,
destObjLocationConstraint, dataLocator, dataStoreContext,
next,
) {
data.uploadPartCopy(
request,
log,
destBucketMD,
sourceLocationConstraintName,
destObjLocationConstraint,
dataLocator,
dataStoreContext,
locationConstraintCheck,
(error, eTag, lastModified, serverSideEncryption, locations) => {
if (error) {
if (error.message === 'skip') {
return next(skipError, destBucketMD, eTag,
lastModified, sourceVerId,
serverSideEncryption, lastModified, splitter);
serverSideEncryption);
}
return next(error, destBucketMD);
}
return next(null, destBucketMD, locations, eTag,
copyObjectSize, sourceVerId, serverSideEncryption,
lastModified, splitter);
lastModified);
});
},
function getExistingPartInfo(destBucketMD, locations, totalHash,
copyObjectSize, sourceVerId, serverSideEncryption, lastModified,
splitter, next) {
next) {
const partKey =
`${uploadId}${constants.splitter}${paddedPartNumber}`;
metadata.getObjectMD(mpuBucketName, partKey, {}, log,
(err, result) => {
// If there is nothing being overwritten just move on
if (err && !err.NoSuchKey) {
if (err && !err.is.NoSuchKey) {
log.debug('error getting current part (if any)',
{ error: err });
return next(err);
@ -281,12 +295,12 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
}
return next(null, destBucketMD, locations, totalHash,
prevObjectSize, copyObjectSize, sourceVerId,
serverSideEncryption, lastModified, oldLocations, splitter);
serverSideEncryption, lastModified, oldLocations);
});
},
function storeNewPartMetadata(destBucketMD, locations, totalHash,
prevObjectSize, copyObjectSize, sourceVerId, serverSideEncryption,
lastModified, oldLocations, splitter, next) {
lastModified, oldLocations, next) {
const metaStoreParams = {
partNumber: paddedPartNumber,
contentMD5: totalHash,
@ -302,58 +316,20 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
{ error: err, method: 'storeNewPartMetadata' });
return next(err);
}
return next(null, locations, oldLocations, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize, splitter);
});
},
function checkCanDeleteOldLocations(partLocations, oldLocations, destBucketMD,
totalHash, lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize, splitter, next) {
if (!oldLocations) {
return next(null, oldLocations, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize);
}
return services.isCompleteMPUInProgress({
bucketName: destBucketName,
objectKey: destObjectKey,
uploadId,
splitter,
}, log, (err, completeInProgress) => {
if (err) {
return next(err, destBucketMD);
}
let oldLocationsToDelete = oldLocations;
// Prevent deletion of old data if a completeMPU
// is already in progress because then there is no
// guarantee that the old location will not be the
// committed one.
if (completeInProgress) {
log.warn('not deleting old locations because CompleteMPU is in progress', {
method: 'objectPutCopyPart::checkCanDeleteOldLocations',
bucketName: destBucketName,
objectKey: destObjectKey,
uploadId,
partLocations,
oldLocations,
});
oldLocationsToDelete = null;
}
return next(null, oldLocationsToDelete, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize);
});
},
function cleanupExistingData(oldLocationsToDelete, destBucketMD, totalHash,
function cleanupExistingData(oldLocations, destBucketMD, totalHash,
lastModified, sourceVerId, serverSideEncryption,
prevObjectSize, copyObjectSize, next) {
// Clean up the old data now that new metadata (with new
// data locations) has been stored
if (oldLocationsToDelete) {
if (oldLocations) {
const delLog = logger.newRequestLoggerFromSerializedUids(
log.getSerializedUids());
return data.batchDelete(oldLocationsToDelete, request.method, null,
return data.batchDelete(oldLocations, request.method, null,
delLog, err => {
if (err) {
// if error, log the error and move on as it is not

View File

@ -5,7 +5,7 @@ const { errors } = require('arsenal');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const { BackendInfo } = require('./apiUtils/object/BackendInfo');
const constants = require('../../constants');
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const { dataStore } = require('./apiUtils/object/storeObject');
const { isBucketAuthorized } =
require('./apiUtils/authorization/permissionChecks');
@ -13,9 +13,6 @@ const kms = require('../kms/wrapper');
const metadata = require('../metadata/wrapper');
const { pushMetric } = require('../utapi/utilities');
const logger = require('../utilities/logger');
const services = require('../services');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const locationConstraintCheck
= require('./apiUtils/object/locationConstraintCheck');
const writeContinue = require('../utilities/writeContinue');
@ -94,7 +91,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
// Get the destination bucket.
next => metadata.getBucket(bucketName, log,
(err, destinationBucket) => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
return next(errors.NoSuchBucket, destinationBucket);
}
if (err) {
@ -142,7 +139,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
(destinationBucket, cipherBundle, next) =>
metadata.getBucket(mpuBucketName, log,
(err, mpuBucket) => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
return next(errors.NoSuchUpload, destinationBucket);
}
if (err) {
@ -190,59 +187,32 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
// no need to store part info in metadata
(destinationBucket, objectLocationConstraint, cipherBundle,
splitter, next) => {
if (config.backends.data === 'multiple') {
// if mpu was initiated in legacy version
if (objectLocationConstraint === undefined) {
const backendInfoObj = locationConstraintCheck(request,
null, destinationBucket, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
next(backendInfoObj.err);
});
}
// eslint-disable-next-line no-param-reassign
objectLocationConstraint = backendInfoObj.controllingLC;
}
if (!multipleBackendGateway.isClientHandleMpu(
objectLocationConstraint)) {
// if data backend doesn't handle MPU, continue waterfall
return next(null, destinationBucket,
objectLocationConstraint, cipherBundle, splitter, null);
}
const mpuInfo = {
destinationBucket,
size,
objectKey,
uploadId,
partNumber,
bucketName,
};
writeContinue(request, request._response);
return multipleBackendGateway.uploadPart(request,
streamingV4Params, null, size, objectLocationConstraint,
objectKey, uploadId, partNumber, bucketName, log,
(err, partInfo) => {
return data.putPart(request, mpuInfo, streamingV4Params,
objectLocationConstraint, locationConstraintCheck, log,
(err, partInfo, updatedObjectLC) => {
if (err) {
log.error('error putting part to data backend', {
error: err,
method:
'objectPutPart::multipleBackendGateway.uploadPart',
});
return next(err, destinationBucket);
} else if (partInfo &&
partInfo.dataStoreType === 'aws_s3') {
// if data backend handles MPU, skip to end of waterfall
}
// if data backend handles mpu, skip to end of waterfall
if (partInfo && partInfo.dataStoreType === 'aws_s3') {
return next(skipError, destinationBucket,
partInfo.dataStoreETag);
} else if (partInfo && partInfo.dataStoreType === 'azure') {
return next(null, destinationBucket,
objectLocationConstraint, cipherBundle, splitter,
partInfo);
}
let msg = 'backend is managing MPU but was';
msg += ' not handle after uploadPart';
log.error(msg, {
error: errors.InternalError,
method:
'objectPutPart::multipleBackendGateway.uploadPart',
// partInfo will be null if data backend is not external
// if the object location constraint undefined because
// mpu was initiated in legacy version, update it
return next(null, destinationBucket, updatedObjectLC,
cipherBundle, splitter, partInfo);
});
return next(errors.InternalError, destinationBucket);
});
}
return next(null, destinationBucket, objectLocationConstraint,
cipherBundle, splitter, null);
},
// Get any pre-existing part.
(destinationBucket, objectLocationConstraint, cipherBundle,
@ -252,7 +222,7 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
return metadata.getObjectMD(mpuBucketName, partKey, {}, log,
(err, res) => {
// If there is no object with the same key, continue.
if (err && !err.NoSuchKey) {
if (err && !err.is.NoSuchKey) {
log.error('error getting current part (if any)', {
error: err,
method: 'objectPutPart::metadata.getObjectMD',
@ -273,19 +243,19 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
}
return next(null, destinationBucket,
objectLocationConstraint, cipherBundle,
partKey, prevObjectSize, oldLocations, partInfo, splitter);
partKey, prevObjectSize, oldLocations, partInfo);
});
},
// Store in data backend.
(destinationBucket, objectLocationConstraint, cipherBundle,
partKey, prevObjectSize, oldLocations, partInfo, splitter, next) => {
partKey, prevObjectSize, oldLocations, partInfo, next) => {
// NOTE: set oldLocations to null so we do not batchDelete for now
if (partInfo && partInfo.dataStoreType === 'azure') {
// skip to storing metadata
return next(null, destinationBucket, partInfo,
partInfo.dataStoreETag,
cipherBundle, partKey, prevObjectSize, null,
objectLocationConstraint, splitter);
objectLocationConstraint);
}
const objectContext = {
bucketName,
@ -305,13 +275,12 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
}
return next(null, destinationBucket, dataGetInfo, hexDigest,
cipherBundle, partKey, prevObjectSize, oldLocations,
objectLocationConstraint, splitter);
objectLocationConstraint);
});
},
// Store data locations in metadata and delete any overwritten
// data if completeMPU hasn't been initiated yet.
// Store data locations in metadata and delete any overwritten data.
(destinationBucket, dataGetInfo, hexDigest, cipherBundle, partKey,
prevObjectSize, oldLocations, objectLocationConstraint, splitter, next) => {
prevObjectSize, oldLocations, objectLocationConstraint, next) => {
// Use an array to be consistent with objectPutCopyPart where there
// could be multiple locations.
const partLocations = [dataGetInfo];
@ -341,54 +310,19 @@ function objectPutPart(authInfo, request, streamingV4Params, log,
});
return next(err, destinationBucket);
}
return next(null, partLocations, oldLocations, objectLocationConstraint,
destinationBucket, hexDigest, prevObjectSize, splitter);
});
},
(partLocations, oldLocations, objectLocationConstraint, destinationBucket,
hexDigest, prevObjectSize, splitter, next) => {
if (!oldLocations) {
return next(null, oldLocations, objectLocationConstraint,
destinationBucket, hexDigest, prevObjectSize);
}
return services.isCompleteMPUInProgress({
bucketName,
objectKey,
uploadId,
splitter,
}, log, (err, completeInProgress) => {
if (err) {
return next(err, destinationBucket);
}
let oldLocationsToDelete = oldLocations;
// Prevent deletion of old data if a completeMPU
// is already in progress because then there is no
// guarantee that the old location will not be the
// committed one.
if (completeInProgress) {
log.warn('not deleting old locations because CompleteMPU is in progress', {
method: 'objectPutPart::metadata.getObjectMD',
bucketName,
objectKey,
uploadId,
partLocations,
oldLocations,
});
oldLocationsToDelete = null;
}
return next(null, oldLocationsToDelete, objectLocationConstraint,
destinationBucket, hexDigest, prevObjectSize);
});
},
// Clean up any old data now that new metadata (with new
// data locations) has been stored.
(oldLocationsToDelete, objectLocationConstraint, destinationBucket, hexDigest,
(oldLocations, objectLocationConstraint, destinationBucket, hexDigest,
prevObjectSize, next) => {
if (oldLocationsToDelete) {
if (oldLocations) {
log.trace('overwriting mpu part, deleting data');
const delLog = logger.newRequestLoggerFromSerializedUids(
log.getSerializedUids());
return data.batchDelete(oldLocationsToDelete, request.method,
return data.batchDelete(oldLocations, request.method,
objectLocationConstraint, delLog, err => {
if (err) {
// if error, log the error and move on as it is not

View File

@ -9,8 +9,7 @@ const { pushMetric } = require('../utapi/utilities');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const metadata = require('../metadata/wrapper');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
const { data } = require('../data/wrapper');
const { parseTagXml } = s3middleware.tagging;
const REPLICATION_ACTION = 'PUT_TAGGING';
@ -90,13 +89,10 @@ function objectPutTagging(authInfo, request, log, callback) {
log, err =>
next(err, bucket, objectMD));
},
(bucket, objectMD, next) => {
if (config.backends.data === 'multiple') {
return multipleBackendGateway.objectTagging('Put', objectKey,
bucket, objectMD, log, err => next(err, bucket, objectMD));
}
return next(null, bucket, objectMD);
},
(bucket, objectMD, next) =>
// if external backend handles tagging
data.objectTagging('Put', objectKey, bucket, objectMD,
log, err => next(err, bucket, objectMD)),
], (err, bucket, objectMD) => {
const additionalResHeaders = collectCorsHeaders(request.headers.origin,
request.method, bucket);

View File

@ -147,7 +147,7 @@ function websiteGet(request, log, callback) {
'bucketGet', constants.publicId, null, log, request);
// if index object does not exist and bucket is private AWS
// returns 403 - AccessDenied error.
if (err === errors.NoSuchKey && !bucketAuthorized) {
if (err.is.NoSuchKey && !bucketAuthorized) {
returnErr = errors.AccessDenied;
}
return _errorActions(returnErr,

View File

@ -107,7 +107,7 @@ function websiteHead(request, log, callback) {
'bucketGet', constants.publicId, null, log, request);
// if index object does not exist and bucket is private AWS
// returns 403 - AccessDenied error.
if (err === errors.NoSuchKey && !bucketAuthorized) {
if (err.is.NoSuchKey && !bucketAuthorized) {
returnErr = errors.AccessDenied;
}
return _errorActions(returnErr, routingRules,

View File

@ -1,558 +0,0 @@
const { errors, s3middleware } = require('arsenal');
const AWS = require('aws-sdk');
const MD5Sum = s3middleware.MD5Sum;
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
const createLogger = require('../multipleBackendLogger');
const { prepareStream } = require('../../api/apiUtils/object/prepareStream');
const { logHelper, removeQuotes, trimXMetaPrefix } = require('./utils');
const { config } = require('../../Config');
const missingVerIdInternalError = errors.InternalError.customizeDescription(
'Invalid state. Please ensure versioning is enabled ' +
'in AWS for the location constraint and try again.'
);
class AwsClient {
constructor(config) {
this.clientType = 'aws_s3';
this._s3Params = config.s3Params;
this._awsBucketName = config.bucketName;
this._bucketMatch = config.bucketMatch;
this._dataStoreName = config.dataStoreName;
this._serverSideEncryption = config.serverSideEncryption;
this._client = new AWS.S3(this._s3Params);
}
_createAwsKey(requestBucketName, requestObjectKey,
bucketMatch) {
if (bucketMatch) {
return requestObjectKey;
}
return `${requestBucketName}/${requestObjectKey}`;
}
put(stream, size, keyContext, reqUids, callback) {
const awsKey = this._createAwsKey(keyContext.bucketName,
keyContext.objectKey, this._bucketMatch);
const metaHeaders = trimXMetaPrefix(keyContext.metaHeaders);
const log = createLogger(reqUids);
const putCb = (err, data) => {
if (err) {
logHelper(log, 'error', 'err from data backend',
err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
if (!data.VersionId) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName);
return callback(missingVerIdInternalError);
}
const dataStoreVersionId = data.VersionId;
return callback(null, awsKey, dataStoreVersionId);
};
const params = {
Bucket: this._awsBucketName,
Key: awsKey,
};
// we call data.put to create a delete marker, but it's actually a
// delete request in call to AWS
if (keyContext.isDeleteMarker) {
return this._client.deleteObject(params, putCb);
}
const uploadParams = params;
uploadParams.Metadata = metaHeaders;
uploadParams.ContentLength = size;
if (this._serverSideEncryption) {
uploadParams.ServerSideEncryption = 'AES256';
}
if (keyContext.tagging) {
uploadParams.Tagging = keyContext.tagging;
}
if (keyContext.contentType !== undefined) {
uploadParams.ContentType = keyContext.contentType;
}
if (keyContext.cacheControl !== undefined) {
uploadParams.CacheControl = keyContext.cacheControl;
}
if (keyContext.contentDisposition !== undefined) {
uploadParams.ContentDisposition = keyContext.contentDisposition;
}
if (keyContext.contentEncoding !== undefined) {
uploadParams.ContentEncoding = keyContext.contentEncoding;
}
if (!stream) {
return this._client.putObject(uploadParams, putCb);
}
uploadParams.Body = stream;
return this._client.upload(uploadParams, putCb);
}
head(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const { key, dataStoreVersionId } = objectGetInfo;
return this._client.headObject({
Bucket: this._awsBucketName,
Key: key,
VersionId: dataStoreVersionId,
}, err => {
if (err) {
logHelper(log, 'error', 'error heading object ' +
'from datastore', err, this._dataStoreName);
if (err.code === 'NotFound') {
const error = errors.ServiceUnavailable
.customizeDescription(
'Unexpected error from AWS: "NotFound". Data on AWS ' +
'may have been altered outside of CloudServer.'
);
return callback(error);
}
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
return callback();
});
}
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
const { key, dataStoreVersionId } = objectGetInfo;
const request = this._client.getObject({
Bucket: this._awsBucketName,
Key: key,
VersionId: dataStoreVersionId,
Range: range ? `bytes=${range[0]}-${range[1]}` : null,
}).on('success', response => {
log.trace('AWS GET request response headers',
{ responseHeaders: response.httpResponse.headers });
});
const stream = request.createReadStream().on('error', err => {
logHelper(log, 'error', 'error streaming data from AWS',
err, this._dataStoreName);
return callback(err);
});
return callback(null, stream);
}
delete(objectGetInfo, reqUids, callback) {
const { key, dataStoreVersionId, deleteVersion } = objectGetInfo;
const log = createLogger(reqUids);
const params = {
Bucket: this._awsBucketName,
Key: key,
};
if (deleteVersion) {
params.VersionId = dataStoreVersionId;
}
return this._client.deleteObject(params, err => {
if (err) {
logHelper(log, 'error', 'error deleting object from ' +
'datastore', err, this._dataStoreName);
if (err.code === 'NoSuchVersion') {
// data may have been deleted directly from the AWS backend
// don't want to retry the delete and errors are not
// sent back to client anyway, so no need to return err
return callback();
}
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
return callback();
});
}
healthcheck(location, callback) {
const awsResp = {};
this._client.headBucket({ Bucket: this._awsBucketName },
err => {
/* eslint-disable no-param-reassign */
if (err) {
awsResp[location] = { error: err, external: true };
return callback(null, awsResp);
}
return this._client.getBucketVersioning({
Bucket: this._awsBucketName },
(err, data) => {
if (err) {
awsResp[location] = { error: err, external: true };
} else if (!data.Status ||
data.Status === 'Suspended') {
awsResp[location] = {
versioningStatus: data.Status,
error: 'Versioning must be enabled',
external: true,
};
} else {
awsResp[location] = {
versioningStatus: data.Status,
message: 'Congrats! You own the bucket',
};
}
return callback(null, awsResp);
});
});
}
createMPU(key, metaHeaders, bucketName, websiteRedirectHeader, contentType,
cacheControl, contentDisposition, contentEncoding, tagging, log,
callback) {
const metaHeadersTrimmed = {};
Object.keys(metaHeaders).forEach(header => {
if (header.startsWith('x-amz-meta-')) {
const headerKey = header.substring(11);
metaHeadersTrimmed[headerKey] = metaHeaders[header];
}
});
Object.assign(metaHeaders, metaHeadersTrimmed);
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const params = {
Bucket: awsBucket,
Key: awsKey,
WebsiteRedirectLocation: websiteRedirectHeader,
Metadata: metaHeaders,
ContentType: contentType,
CacheControl: cacheControl,
ContentDisposition: contentDisposition,
ContentEncoding: contentEncoding,
};
if (tagging) {
params.Tagging = tagging;
}
return this._client.createMultipartUpload(params, (err, mpuResObj) => {
if (err) {
logHelper(log, 'error', 'err from data backend',
err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
return callback(null, mpuResObj);
});
}
uploadPart(request, streamingV4Params, stream, size, key, uploadId,
partNumber, bucketName, log, callback) {
let hashedStream = stream;
if (request) {
const partStream = prepareStream(request, streamingV4Params,
log, callback);
hashedStream = new MD5Sum();
partStream.pipe(hashedStream);
}
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const params = { Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
Body: hashedStream, ContentLength: size,
PartNumber: partNumber };
return this._client.uploadPart(params, (err, partResObj) => {
if (err) {
logHelper(log, 'error', 'err from data backend ' +
'on uploadPart', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
// Because we manually add quotes to ETag later, remove quotes here
const noQuotesETag =
partResObj.ETag.substring(1, partResObj.ETag.length - 1);
const dataRetrievalInfo = {
key: awsKey,
dataStoreType: 'aws_s3',
dataStoreName: this._dataStoreName,
dataStoreETag: noQuotesETag,
};
return callback(null, dataRetrievalInfo);
});
}
listParts(key, uploadId, bucketName, partNumberMarker, maxParts, log,
callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const params = { Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
PartNumberMarker: partNumberMarker, MaxParts: maxParts };
return this._client.listParts(params, (err, partList) => {
if (err) {
logHelper(log, 'error', 'err from data backend on listPart',
err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
// build storedParts object to mimic Scality S3 backend returns
const storedParts = {};
storedParts.IsTruncated = partList.IsTruncated;
storedParts.Contents = [];
storedParts.Contents = partList.Parts.map(item => {
// We manually add quotes to ETag later, so remove quotes here
const noQuotesETag =
item.ETag.substring(1, item.ETag.length - 1);
return {
partNumber: item.PartNumber,
value: {
Size: item.Size,
ETag: noQuotesETag,
LastModified: item.LastModified,
},
};
});
return callback(null, storedParts);
});
}
/**
* completeMPU - complete multipart upload on AWS backend
* @param {object} jsonList - user-sent list of parts to include in
* final mpu object
* @param {object} mdInfo - object containing 3 keys: storedParts,
* mpuOverviewKey, and splitter
* @param {string} key - object key
* @param {string} uploadId - multipart upload id string
* @param {string} bucketName - name of bucket
* @param {RequestLogger} log - logger instance
* @param {function} callback - callback function
* @return {(Error|object)} - return Error if complete MPU fails, otherwise
* object containing completed object key, eTag, and contentLength
*/
completeMPU(jsonList, mdInfo, key, uploadId, bucketName, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const mpuError = {
InvalidPart: true,
InvalidPartOrder: true,
EntityTooSmall: true,
};
const partArray = [];
const partList = jsonList.Part;
partList.forEach(partObj => {
const partParams = { PartNumber: partObj.PartNumber[0],
ETag: partObj.ETag[0] };
partArray.push(partParams);
});
const mpuParams = {
Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
MultipartUpload: {
Parts: partArray,
},
};
const completeObjData = { key: awsKey };
return this._client.completeMultipartUpload(mpuParams,
(err, completeMpuRes) => {
if (err) {
if (mpuError[err.code]) {
logHelper(log, 'trace', 'err from data backend on ' +
'completeMPU', err, this._dataStoreName);
return callback(errors[err.code]);
}
logHelper(log, 'error', 'err from data backend on ' +
'completeMPU', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
if (!completeMpuRes.VersionId) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName);
return callback(missingVerIdInternalError);
}
// need to get content length of new object to store
// in our metadata
return this._client.headObject({ Bucket: awsBucket, Key: awsKey },
(err, objHeaders) => {
if (err) {
logHelper(log, 'trace', 'err from data backend on ' +
'headObject', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
// remove quotes from eTag because they're added later
completeObjData.eTag = completeMpuRes.ETag
.substring(1, completeMpuRes.ETag.length - 1);
completeObjData.dataStoreVersionId = completeMpuRes.VersionId;
completeObjData.contentLength = objHeaders.ContentLength;
return callback(null, completeObjData);
});
});
}
abortMPU(key, uploadId, bucketName, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const abortParams = {
Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
};
return this._client.abortMultipartUpload(abortParams, err => {
if (err) {
logHelper(log, 'error', 'There was an error aborting ' +
'the MPU on AWS S3. You should abort directly on AWS S3 ' +
'using the same uploadId.', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
return callback();
});
}
objectPutTagging(key, bucket, objectMD, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucket, key, this._bucketMatch);
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
const tagParams = {
Bucket: awsBucket,
Key: awsKey,
VersionId: dataStoreVersionId,
};
const keyArray = Object.keys(objectMD.tags);
tagParams.Tagging = {};
tagParams.Tagging.TagSet = keyArray.map(key => {
const value = objectMD.tags[key];
return { Key: key, Value: value };
});
return this._client.putObjectTagging(tagParams, err => {
if (err) {
logHelper(log, 'error', 'error from data backend on ' +
'putObjectTagging', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
return callback();
});
}
objectDeleteTagging(key, bucket, objectMD, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucket, key, this._bucketMatch);
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
const tagParams = {
Bucket: awsBucket,
Key: awsKey,
VersionId: dataStoreVersionId,
};
return this._client.deleteObjectTagging(tagParams, err => {
if (err) {
logHelper(log, 'error', 'error from data backend on ' +
'deleteObjectTagging', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
return callback();
});
}
copyObject(request, destLocationConstraintName, sourceKey,
sourceLocationConstraintName, storeMetadataParams, log, callback) {
const destBucketName = request.bucketName;
const destObjectKey = request.objectKey;
const destAwsKey = this._createAwsKey(destBucketName, destObjectKey,
this._bucketMatch);
const sourceAwsBucketName =
config.getAwsBucketName(sourceLocationConstraintName);
const metadataDirective = request.headers['x-amz-metadata-directive'];
const metaHeaders = trimXMetaPrefix(getMetaHeaders(request.headers));
const awsParams = {
Bucket: this._awsBucketName,
Key: destAwsKey,
CopySource: `${sourceAwsBucketName}/${sourceKey}`,
Metadata: metaHeaders,
MetadataDirective: metadataDirective,
};
if (destLocationConstraintName &&
config.isAWSServerSideEncryption(destLocationConstraintName)) {
awsParams.ServerSideEncryption = 'AES256';
}
this._client.copyObject(awsParams, (err, copyResult) => {
if (err) {
if (err.code === 'AccessDenied') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceAwsBucketName} AWS bucket`, err,
this._dataStoreName);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceAwsBucketName} AWS bucket`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'copyObject', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
if (!copyResult.VersionId) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName);
return callback(missingVerIdInternalError);
}
return callback(null, destAwsKey, copyResult.VersionId);
});
}
uploadPartCopy(request, awsSourceKey, sourceLocationConstraintName,
log, callback) {
const destBucketName = request.bucketName;
const destObjectKey = request.objectKey;
const destAwsKey = this._createAwsKey(destBucketName, destObjectKey,
this._bucketMatch);
const sourceAwsBucketName =
config.getAwsBucketName(sourceLocationConstraintName);
const uploadId = request.query.uploadId;
const partNumber = request.query.partNumber;
const copySourceRange = request.headers['x-amz-copy-source-range'];
const params = {
Bucket: this._awsBucketName,
CopySource: `${sourceAwsBucketName}/${awsSourceKey}`,
CopySourceRange: copySourceRange,
Key: destAwsKey,
PartNumber: partNumber,
UploadId: uploadId,
};
return this._client.uploadPartCopy(params, (err, res) => {
if (err) {
if (err.code === 'AccessDenied') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceAwsBucketName} AWS bucket`, err,
this._dataStoreName);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceAwsBucketName} AWS bucket`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'uploadPartCopy', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
const eTag = removeQuotes(res.CopyPartResult.ETag);
return callback(null, eTag);
});
}
}
module.exports = AwsClient;

View File

@ -1,434 +0,0 @@
const url = require('url');
const { errors, s3middleware } = require('arsenal');
const azure = require('azure-storage');
const createLogger = require('../multipleBackendLogger');
const { logHelper, translateAzureMetaHeaders } = require('./utils');
const { config } = require('../../Config');
const { validateAndFilterMpuParts } =
require('../../api/apiUtils/object/processMpuParts');
const constants = require('../../../constants');
const metadata = require('../../metadata/wrapper');
const azureMpuUtils = s3middleware.azureHelper.mpuUtils;
class AzureClient {
constructor(config) {
this._azureStorageEndpoint = config.azureStorageEndpoint;
this._azureStorageCredentials = config.azureStorageCredentials;
this._azureContainerName = config.azureContainerName;
this._client = azure.createBlobService(
this._azureStorageCredentials.storageAccountName,
this._azureStorageCredentials.storageAccessKey,
this._azureStorageEndpoint);
this._dataStoreName = config.dataStoreName;
this._bucketMatch = config.bucketMatch;
if (config.proxy) {
const parsedUrl = url.parse(config.proxy);
if (!parsedUrl.port) {
parsedUrl.port = 80;
}
this._client.setProxy(parsedUrl);
}
}
_errorWrapper(s3Method, azureMethod, args, log, cb) {
if (log) {
log.info(`calling azure ${azureMethod}`);
}
try {
this._client[azureMethod].apply(this._client, args);
} catch (err) {
const error = errors.ServiceUnavailable;
if (log) {
log.error('error thrown by Azure Storage Client Library',
{ error: err.message, stack: err.stack, s3Method,
azureMethod, dataStoreName: this._dataStoreName });
}
cb(error.customizeDescription('Error from Azure ' +
`method: ${azureMethod} on ${s3Method} S3 call: ` +
`${err.message}`));
}
}
_createAzureKey(requestBucketName, requestObjectKey,
bucketMatch) {
if (bucketMatch) {
return requestObjectKey;
}
return `${requestBucketName}/${requestObjectKey}`;
}
_getMetaHeaders(objectMD) {
const metaHeaders = {};
Object.keys(objectMD).forEach(mdKey => {
const isMetaHeader = mdKey.startsWith('x-amz-meta-');
if (isMetaHeader) {
metaHeaders[mdKey] = objectMD[mdKey];
}
});
return translateAzureMetaHeaders(metaHeaders);
}
// Before putting or deleting object on Azure, check if MPU exists with
// same key name. If it does, do not allow put or delete because Azure
// will delete all blocks with same key name
protectAzureBlocks(bucketName, objectKey, dataStoreName, log, cb) {
const mpuBucketName = `${constants.mpuBucketPrefix}${bucketName}`;
const splitter = constants.splitter;
const listingParams = {
prefix: `overview${splitter}${objectKey}`,
listingType: 'MPU',
splitter,
maxKeys: 1,
};
return metadata.listMultipartUploads(mpuBucketName, listingParams,
log, (err, mpuList) => {
if (err && !err.NoSuchBucket) {
log.error('Error listing MPUs for Azure delete',
{ error: err, dataStoreName });
return cb(errors.ServiceUnavailable);
}
if (mpuList && mpuList.Uploads && mpuList.Uploads.length > 0) {
const error = errors.MPUinProgress;
log.error('Error: cannot put/delete object to Azure with ' +
'same key name as ongoing MPU on Azure',
{ error, dataStoreName });
return cb(error);
}
// If listMultipartUploads returns a NoSuchBucket error or the
// mpu list is empty, there are no conflicting MPUs, so continue
return cb();
});
}
put(stream, size, keyContext, reqUids, callback) {
const log = createLogger(reqUids);
// before blob is put, make sure there is no ongoing MPU with same key
this.protectAzureBlocks(keyContext.bucketName,
keyContext.objectKey, this._dataStoreName, log, err => {
// if error returned, there is ongoing MPU, so do not put
if (err) {
return callback(err.customizeDescription(
`Error putting object to Azure: ${err.message}`));
}
const azureKey = this._createAzureKey(keyContext.bucketName,
keyContext.objectKey, this._bucketMatch);
const options = {
metadata: translateAzureMetaHeaders(keyContext.metaHeaders,
keyContext.tagging),
contentSettings: {
contentType: keyContext.contentType || undefined,
cacheControl: keyContext.cacheControl || undefined,
contentDisposition: keyContext.contentDisposition ||
undefined,
contentEncoding: keyContext.contentEncoding || undefined,
},
};
if (size === 0) {
return this._errorWrapper('put', 'createBlockBlobFromText',
[this._azureContainerName, azureKey, '', options,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure PUT data ' +
'backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
}], log, callback);
}
return this._errorWrapper('put', 'createBlockBlobFromStream',
[this._azureContainerName, azureKey, stream, size, options,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure PUT data ' +
'backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
}], log, callback);
});
}
head(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const { key, azureStreamingOptions } = objectGetInfo;
return this._errorWrapper('head', 'getBlobProperties',
[this._azureContainerName, key, azureStreamingOptions,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure HEAD data backend',
err, this._dataStoreName);
if (err.code === 'NotFound') {
const error = errors.ServiceUnavailable
.customizeDescription(
'Unexpected error from Azure: "NotFound". Data ' +
'on Azure may have been altered outside of ' +
'CloudServer.');
return callback(error);
}
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback();
}], log, callback);
}
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
// for backwards compatibility
const { key, response, azureStreamingOptions } = objectGetInfo;
let streamingOptions;
if (azureStreamingOptions) {
// option coming from api.get()
streamingOptions = azureStreamingOptions;
} else if (range) {
// option coming from multipleBackend.upload()
const rangeStart = range[0] ? range[0].toString() : undefined;
const rangeEnd = range[1] ? range[1].toString() : undefined;
streamingOptions = { rangeStart, rangeEnd };
}
this._errorWrapper('get', 'getBlobToStream',
[this._azureContainerName, key, response, streamingOptions,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure GET data backend',
err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback(null, response);
}], log, callback);
}
delete(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
// for backwards compatibility
const key = typeof objectGetInfo === 'string' ? objectGetInfo :
objectGetInfo.key;
return this._errorWrapper('delete', 'deleteBlobIfExists',
[this._azureContainerName, key,
err => {
if (err) {
const log = createLogger(reqUids);
logHelper(log, 'error', 'error deleting object from ' +
'Azure datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback();
}], log, callback);
}
healthcheck(location, callback, flightCheckOnStartUp) {
const azureResp = {};
const healthCheckAction = flightCheckOnStartUp ?
'createContainerIfNotExists' : 'doesContainerExist';
this._errorWrapper('checkAzureHealth', healthCheckAction,
[this._azureContainerName, err => {
/* eslint-disable no-param-reassign */
if (err) {
azureResp[location] = { error: err.message,
external: true };
return callback(null, azureResp);
}
azureResp[location] = {
message:
'Congrats! You can access the Azure storage account',
};
return callback(null, azureResp);
}], null, callback);
}
uploadPart(request, streamingV4Params, partStream, size, key, uploadId,
partNumber, bucket, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const params = { bucketName: this._azureContainerName,
partNumber, size, objectKey: azureKey, uploadId };
const stream = request || partStream;
if (request && request.headers && request.headers['content-md5']) {
params.contentMD5 = request.headers['content-md5'];
}
const dataRetrievalInfo = {
key: azureKey,
partNumber,
dataStoreName: this._dataStoreName,
dataStoreType: 'azure',
numberSubParts: azureMpuUtils.getSubPartInfo(size)
.expectedNumberSubParts,
};
if (size === 0) {
log.debug('0-byte part does not store data',
{ method: 'uploadPart' });
dataRetrievalInfo.dataStoreETag = azureMpuUtils.zeroByteETag;
dataRetrievalInfo.numberSubParts = 0;
return callback(null, dataRetrievalInfo);
}
if (size <= azureMpuUtils.maxSubPartSize) {
const errorWrapperFn = this._errorWrapper.bind(this);
return azureMpuUtils.putSinglePart(errorWrapperFn,
stream, params, this._dataStoreName, log, (err, dataStoreETag) => {
if (err) {
return callback(err);
}
dataRetrievalInfo.dataStoreETag = dataStoreETag;
return callback(null, dataRetrievalInfo);
});
}
const errorWrapperFn = this._errorWrapper.bind(this);
return azureMpuUtils.putSubParts(errorWrapperFn, stream,
params, this._dataStoreName, log, (err, dataStoreETag) => {
if (err) {
return callback(err);
}
dataRetrievalInfo.dataStoreETag = dataStoreETag;
return callback(null, dataRetrievalInfo);
});
}
completeMPU(jsonList, mdInfo, key, uploadId, bucket, metaHeaders,
contentSettings, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const commitList = {
UncommittedBlocks: jsonList.uncommittedBlocks || [],
};
let filteredPartsObj;
if (!jsonList.uncommittedBlocks) {
const { storedParts, mpuOverviewKey, splitter } = mdInfo;
filteredPartsObj = validateAndFilterMpuParts(storedParts, jsonList,
mpuOverviewKey, splitter, log);
filteredPartsObj.partList.forEach(part => {
// part.locations is always array of 1, which contains data info
const subPartIds =
azureMpuUtils.getSubPartIds(part.locations[0], uploadId);
commitList.UncommittedBlocks.push(...subPartIds);
});
}
const options = {
contentSettings,
metadata: metaHeaders ? translateAzureMetaHeaders(metaHeaders) :
undefined,
};
this._errorWrapper('completeMPU', 'commitBlocks',
[this._azureContainerName, azureKey, commitList, options,
err => {
if (err) {
logHelper(log, 'error', 'err completing MPU on Azure ' +
'datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
const completeObjData = {
key: azureKey,
filteredPartsObj,
};
return callback(null, completeObjData);
}], log, callback);
}
objectPutTagging(key, bucket, objectMD, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const azureMD = this._getMetaHeaders(objectMD);
azureMD.tags = JSON.stringify(objectMD.tags);
this._errorWrapper('objectPutTagging', 'setBlobMetadata',
[this._azureContainerName, azureKey, azureMD,
err => {
if (err) {
logHelper(log, 'error', 'err putting object tags to ' +
'Azure backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback();
}], log, callback);
}
objectDeleteTagging(key, bucket, objectMD, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const azureMD = this._getMetaHeaders(objectMD);
this._errorWrapper('objectDeleteTagging', 'setBlobMetadata',
[this._azureContainerName, azureKey, azureMD,
err => {
if (err) {
logHelper(log, 'error', 'err putting object tags to ' +
'Azure backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback();
}], log, callback);
}
copyObject(request, destLocationConstraintName, sourceKey,
sourceLocationConstraintName, storeMetadataParams, log, callback) {
const destContainerName = request.bucketName;
const destObjectKey = request.objectKey;
const destAzureKey = this._createAzureKey(destContainerName,
destObjectKey, this._bucketMatch);
const sourceContainerName =
config.locationConstraints[sourceLocationConstraintName]
.details.azureContainerName;
let options;
if (storeMetadataParams.metaHeaders) {
options = { metadata:
translateAzureMetaHeaders(storeMetadataParams.metaHeaders) };
}
this._errorWrapper('copyObject', 'startCopyBlob',
[`${this._azureStorageEndpoint}` +
`${sourceContainerName}/${sourceKey}`,
this._azureContainerName, destAzureKey, options,
(err, res) => {
if (err) {
if (err.code === 'CannotVerifyCopySource') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceContainerName} Azure Container`, err,
this._dataStoreName);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceContainerName} Azure Container`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'copyObject', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
if (res.copy.status === 'pending') {
logHelper(log, 'error', 'Azure copy status is pending',
err, this._dataStoreName);
const copyId = res.copy.id;
this._client.abortCopyBlob(this._azureContainerName,
destAzureKey, copyId, err => {
if (err) {
logHelper(log, 'error', 'error from data backend ' +
'on abortCopyBlob', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS on abortCopyBlob: ${err.message}`)
);
}
return callback(errors.InvalidObjectState
.customizeDescription('Error: Azure copy status was ' +
'pending. It has been aborted successfully')
);
});
}
return callback(null, destAzureKey);
}], log, callback);
}
}
module.exports = AzureClient;

View File

@ -1,246 +0,0 @@
const googleAuth = require('google-auto-auth');
const async = require('async');
const AWS = require('aws-sdk');
const { errors } = require('arsenal');
const Service = AWS.Service;
const GcpSigner = require('./GcpSigner');
function genAuth(authParams, callback) {
async.tryEach([
function authKeyFile(next) {
const authOptions = {
scopes: authParams.scopes,
keyFilename: authParams.keyFilename,
};
const auth = googleAuth(authOptions);
auth.getToken(err => next(err, auth));
},
function authCredentials(next) {
const authOptions = {
scopes: authParams.scopes,
credentials: authParams.credentials,
};
const auth = googleAuth(authOptions);
auth.getToken(err => next(err, auth));
},
], (err, result) => callback(err, result));
}
AWS.apiLoader.services.gcp = {};
const GCP = Service.defineService('gcp', ['2017-11-01'], {
_jsonAuth: null,
_authParams: null,
getToken(callback) {
if (this._jsonAuth) {
return this._jsonAuth.getToken(callback);
}
if (!this._authParams && this.config.authParams &&
typeof this.config.authParams === 'object') {
this._authParams = this.config.authParams;
}
return genAuth(this._authParams, (err, auth) => {
if (!err) {
this._jsonAuth = auth;
return this._jsonAuth.getToken(callback);
}
// should never happen, but if all preconditions fails
// can't generate tokens
return callback(errors.InternalError.customizeDescription(
'Unable to create a google authorizer'));
});
},
getSignerClass() {
return GcpSigner;
},
validateService() {
if (!this.config.region) {
this.config.region = 'us-east-1';
}
},
upload(params, options, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: upload not implemented'));
},
// Service API
listBuckets(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: listBuckets not implemented'));
},
// Bucket APIs
getBucketLocation(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketLocation not implemented'));
},
deleteBucket(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteBucket not implemented'));
},
headBucket(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: headBucket not implemented'));
},
listObjects(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: listObjects not implemented'));
},
listObjectVersions(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: listObjecVersions not implemented'));
},
putBucket(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucket not implemented'));
},
getBucketAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketAcl not implemented'));
},
putBucketAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucketAcl not implemented'));
},
putBucketWebsite(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucketWebsite not implemented'));
},
getBucketWebsite(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketWebsite not implemented'));
},
deleteBucketWebsite(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteBucketWebsite not implemented'));
},
putBucketCors(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucketCors not implemented'));
},
getBucketCors(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketCors not implemented'));
},
deleteBucketCors(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteBucketCors not implemented'));
},
// Object APIs
headObject(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: headObject not implemented'));
},
putObject(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putObject not implemented'));
},
getObject(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getObject not implemented'));
},
deleteObject(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteObject not implemented'));
},
deleteObjects(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteObjects not implemented'));
},
copyObject(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: copyObject not implemented'));
},
putObjectTagging(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putObjectTagging not implemented'));
},
deleteObjectTagging(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteObjectTagging not implemented'));
},
putObjectAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putObjectAcl not implemented'));
},
getObjectAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getObjectAcl not implemented'));
},
// Multipart upload
abortMultipartUpload(params, callback) {
return callback(errors.NotImplemented
.customizeDescription(
'GCP: abortMultipartUpload not implemented'));
},
createMultipartUpload(params, callback) {
return callback(errors.NotImplemented
.customizeDescription(
'GCP: createMultipartUpload not implemented'));
},
completeMultipartUpload(params, callback) {
return callback(errors.NotImplemented
.customizeDescription(
'GCP: completeMultipartUpload not implemented'));
},
uploadPart(params, callback) {
return callback(errors.NotImplemented
.customizeDescription(
'GCP: uploadPart not implemented'));
},
uploadPartCopy(params, callback) {
return callback(errors.NotImplemented
.customizeDescription(
'GCP: uploadPartCopy not implemented'));
},
listParts(params, callback) {
return callback(errors.NotImplemented
.customizeDescription(
'GCP: listParts not implemented'));
},
});
Object.defineProperty(AWS.apiLoader.services.gcp, '2017-11-01', {
get: function get() {
const model = require('./gcp-2017-11-01.api.json');
return model;
},
enumerable: true,
configurable: true,
});
module.exports = GCP;

View File

@ -1,48 +0,0 @@
const url = require('url');
const qs = require('querystring');
const AWS = require('aws-sdk');
const werelogs = require('werelogs');
const { constructStringToSignV2 } = require('arsenal').auth.client;
const logger = new werelogs.Logger('GcpSigner');
function genQueryObject(uri) {
const queryString = url.parse(uri).query;
return qs.parse(queryString);
}
const GcpSigner = AWS.util.inherit(AWS.Signers.RequestSigner, {
constructor: function GcpSigner(request) {
AWS.Signers.RequestSigner.call(this, request);
},
addAuthorization: function addAuthorization(credentials, date) {
if (!this.request.headers['presigned-expires']) {
this.request.headers['x-goog-date'] = AWS.util.date.rfc822(date);
}
const signature =
this.sign(credentials.secretAccessKey, this.stringToSign());
const auth = `GOOG1 ${credentials.accessKeyId}: ${signature}`;
this.request.headers.Authorization = auth;
},
stringToSign: function stringToSign() {
const requestObject = {
url: this.request.path,
method: this.request.method,
host: this.request.endpoint.host,
headers: this.request.headers,
query: genQueryObject(this.request.path) || {},
};
const data = Object.assign({}, this.request.headers);
return constructStringToSignV2(requestObject, data, logger, 'GCP');
},
sign: function sign(secret, string) {
return AWS.util.crypto.hmac(secret, string, 'base64', 'sha1');
},
});
module.exports = GcpSigner;

View File

@ -1 +0,0 @@
module.exports = {};

View File

@ -1,17 +0,0 @@
{
"version": "1.0",
"metadata": {
"apiVersion": "2017-11-01",
"checksumFormat": "md5",
"endpointPrefix": "s3",
"globalEndpoint": "storage.googleapi.com",
"protocol": "rest-xml",
"serviceAbbreviation": "GCP",
"serviceFullName": "Google Cloud Storage",
"signatureVersion": "s3",
"timestampFormat": "rfc822",
"uid": "gcp-2017-11-01"
},
"operations": {},
"shapes": {}
}

View File

@ -1,5 +0,0 @@
module.exports = {
GCP: require('./GcpService'),
GcpSigner: require('./GcpSigner'),
GcpUtils: require('./GcpUtils'),
};

View File

@ -1,39 +0,0 @@
const { GCP } = require('./GCP');
const AwsClient = require('./AwsClient');
/**
* Class representing a Google Cloud Storage backend object
* @extends AwsClient
*/
class GcpClient extends AwsClient {
/**
* constructor - creates a Gcp backend client object (inherits )
* @param {object} config - configuration object for Gcp Backend up
* @param {object} config.s3params - S3 configuration
* @param {string} config.bucketName - GCP bucket name
* @param {string} config.mpuBucket - GCP mpu bucket name
* @param {string} config.overflowBucket - GCP overflow bucket name
* @param {boolean} config.bucketMatch - bucket match flag
* @param {object} config.authParams - GCP service credentials
* @param {string} config.dataStoreName - locationConstraint name
* @param {booblean} config.serverSideEncryption - server side encryption
* flag
* @return {object} - returns a GcpClient object
*/
constructor(config) {
super(config);
this.clientType = 'gcp';
this._gcpBucketName = config.bucketName;
this._mpuBucketName = config.mpuBucket;
this._overflowBucketname = config.overflowBucket;
this._gcpParams = Object.assign(this._s3Params, {
mainBucket: this._gcpBucketName,
mpuBucket: this._mpuBucketName,
overflowBucket: this._overflowBucketname,
authParams: config.authParams,
});
this._client = new GCP(this._gcpParams);
}
}
module.exports = GcpClient;

View File

@ -1,147 +0,0 @@
const async = require('async');
const constants = require('../../../constants');
const { config } = require('../../../lib/Config');
const awsHealth = {
response: undefined,
time: 0,
};
const azureHealth = {
response: undefined,
time: 0,
};
const utils = {
logHelper(log, level, description, error, dataStoreName) {
log[level](description, { error: error.message,
errorName: error.name, dataStoreName });
},
// take off the 'x-amz-meta-'
trimXMetaPrefix(obj) {
const newObj = {};
Object.keys(obj).forEach(key => {
const newKey = key.substring(11);
newObj[newKey] = obj[key];
});
return newObj;
},
removeQuotes(word) {
return word.slice(1, -1);
},
skipMpuPartProcessing(completeObjData) {
const backendType = completeObjData.dataStoreType;
if (constants.mpuMDStoredExternallyBackend[backendType]) {
return true;
}
return false;
},
/**
* checkAzureBackendMatch - checks that the external backend location for
* two data objects is the same and is Azure
* @param {array} objectDataOne - data of first object to compare
* @param {object} objectDataTwo - data of second object to compare
* @return {boolean} - true if both data backends are Azure, false if not
*/
checkAzureBackendMatch(objectDataOne, objectDataTwo) {
if (objectDataOne.dataStoreType === 'azure' &&
objectDataTwo.dataStoreType === 'azure') {
return true;
}
return false;
},
/**
* externalBackendCopy - Server side copy should only be allowed:
* 1) if source object and destination object are both on aws or both
* on azure.
* 2) if azure to azure, must be the same storage account since Azure
* copy outside of an account is async
* 3) if the source bucket is not an encrypted bucket and the
* destination bucket is not an encrypted bucket (unless the copy
* is all within the same bucket).
* @param {string} locationConstraintSrc - location constraint of the source
* @param {string} locationConstraintDest - location constraint of the
* destination
* @param {object} sourceBucketMD - metadata of the source bucket
* @param {object} destBucketMD - metadata of the destination bucket
* @return {boolean} - true if copying object from one
* externalbackend to the same external backend and for Azure if it is the
* same account since Azure copy outside of an account is async
*/
externalBackendCopy(locationConstraintSrc, locationConstraintDest,
sourceBucketMD, destBucketMD) {
const sourceBucketName = sourceBucketMD.getName();
const destBucketName = destBucketMD.getName();
const isSameBucket = sourceBucketName === destBucketName;
const bucketsNotEncrypted = destBucketMD.getServerSideEncryption()
=== sourceBucketMD.getServerSideEncryption() &&
sourceBucketMD.getServerSideEncryption() === null;
const sourceLocationConstraintType =
config.getLocationConstraintType(locationConstraintSrc);
const locationTypeMatch =
config.getLocationConstraintType(locationConstraintSrc) ===
config.getLocationConstraintType(locationConstraintDest);
return locationTypeMatch && (isSameBucket || bucketsNotEncrypted) &&
(sourceLocationConstraintType === 'aws_s3' ||
(sourceLocationConstraintType === 'azure' &&
config.isSameAzureAccount(locationConstraintSrc,
locationConstraintDest)));
},
checkExternalBackend(clients, locations, type, flightCheckOnStartUp,
externalBackendHealthCheckInterval, cb) {
const checkStatus = type === 'aws_s3' ? awsHealth : azureHealth;
if (locations.length === 0) {
return process.nextTick(cb, null, []);
}
if (!flightCheckOnStartUp && checkStatus.response &&
Date.now() - checkStatus.time
< externalBackendHealthCheckInterval) {
return process.nextTick(cb, null, checkStatus.response);
}
let locationsToCheck;
if (flightCheckOnStartUp) {
// check all locations if flight check on start up
locationsToCheck = locations;
} else {
const randomLocation = locations[Math.floor(Math.random() *
locations.length)];
locationsToCheck = [randomLocation];
}
return async.mapLimit(locationsToCheck, 5, (location, next) => {
const client = clients[location];
client.healthcheck(location, next, flightCheckOnStartUp);
}, (err, results) => {
if (err) {
return cb(err);
}
if (!flightCheckOnStartUp) {
checkStatus.response = results;
checkStatus.time = Date.now();
}
return cb(null, results);
});
},
translateAzureMetaHeaders(metaHeaders, tags) {
const translatedMetaHeaders = {};
if (tags) {
// tags are passed as string of format 'key1=value1&key2=value2'
const tagObj = {};
const tagArr = tags.split('&');
tagArr.forEach(keypair => {
const equalIndex = keypair.indexOf('=');
const key = keypair.substring(0, equalIndex);
tagObj[key] = keypair.substring(equalIndex + 1);
});
translatedMetaHeaders.tags = JSON.stringify(tagObj);
}
Object.keys(metaHeaders).forEach(headerName => {
const translated = headerName.substring(11).replace(/-/g, '_');
translatedMetaHeaders[translated] = metaHeaders[headerName];
});
return translatedMetaHeaders;
},
};
module.exports = utils;

View File

@ -1,37 +0,0 @@
const arsenal = require('arsenal');
const { config } = require('../../Config');
class DataFileInterface {
constructor() {
const { host, port } = config.dataClient;
this.restClient = new arsenal.network.rest.RESTClient(
{ host, port });
}
put(stream, size, keyContext, reqUids, callback) {
// ignore keyContext
this.restClient.put(stream, size, reqUids, callback);
}
get(objectGetInfo, range, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
this.restClient.get(key, range, reqUids, callback);
}
delete(objectGetInfo, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
this.restClient.delete(key, reqUids, callback);
}
getDiskUsage(reqUids, callback) {
this.restClient.getAction('diskUsage', reqUids, (err, val) => {
if (err) {
return callback(err);
}
return callback(null, JSON.parse(val));
});
}
}
module.exports = DataFileInterface;

View File

@ -1,93 +0,0 @@
const stream = require('stream');
const { errors } = require('arsenal');
const werelogs = require('werelogs');
const logger = new werelogs.Logger('MemDataBackend');
function createLogger(reqUids) {
return reqUids ?
logger.newRequestLoggerFromSerializedUids(reqUids) :
logger.newRequestLogger();
}
const ds = [];
let count = 1; // keys are assessed with if (!key)
function resetCount() {
count = 1;
}
const backend = {
put: function putMem(request, size, keyContext, reqUids, callback) {
const log = createLogger(reqUids);
const value = Buffer.alloc(size);
let cursor = 0;
let exceeded = false;
request.on('data', data => {
if (cursor + data.length > size) {
exceeded = true;
}
if (!exceeded) {
data.copy(value, cursor);
}
cursor += data.length;
})
.on('end', () => {
if (exceeded) {
log.error('data stream exceed announced size',
{ size, overflow: cursor });
callback(errors.InternalError);
} else {
ds[count] = { value, keyContext };
callback(null, count++);
}
});
},
get: function getMem(objectGetInfo, range, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
process.nextTick(() => {
if (!ds[key]) { return callback(errors.NoSuchKey); }
const storedBuffer = ds[key].value;
// If a range was sent, use the start from the range.
// Otherwise, start at 0
let start = range ? range[0] : 0;
// If a range was sent, use the end from the range.
// End of range should be included so +1
// Otherwise, get the full length
const end = range ? range[1] + 1 : storedBuffer.length;
const chunkSize = 64 * 1024; // 64KB
const val = new stream.Readable({
read: function read() {
// sets this._read under the hood
// push data onto the read queue, passing null
// will signal the end of the stream (EOF)
while (start < end) {
const finish =
Math.min(start + chunkSize, end);
this.push(storedBuffer.slice(start, finish));
start += chunkSize;
}
if (start >= end) {
this.push(null);
}
},
});
return callback(null, val);
});
},
delete: function delMem(objectGetInfo, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
process.nextTick(() => {
delete ds[key];
return callback(null);
});
},
};
module.exports = {
backend,
ds,
resetCount,
};

View File

@ -1,126 +0,0 @@
const http = require('http');
const https = require('https');
const AWS = require('aws-sdk');
const Sproxy = require('sproxydclient');
const DataFileBackend = require('./file/backend');
const inMemory = require('./in_memory/backend').backend;
const AwsClient = require('./external/AwsClient');
const GcpClient = require('./external/GcpClient');
const AzureClient = require('./external/AzureClient');
const { config } = require('../Config');
const proxyAddress = 'http://localhost:3128';
function parseLC() {
const clients = {};
Object.keys(config.locationConstraints).forEach(location => {
const locationObj = config.locationConstraints[location];
if (locationObj.type === 'mem') {
clients[location] = inMemory;
}
if (locationObj.type === 'file') {
clients[location] = new DataFileBackend();
}
if (locationObj.type === 'scality'
&& locationObj.details.connector.sproxyd) {
clients[location] = new Sproxy({
bootstrap: locationObj.details.connector
.sproxyd.bootstrap,
// Might be undefined which is ok since there is a default
// set in sproxydclient if chordCos is undefined
chordCos: locationObj.details.connector.sproxyd.chordCos,
// Might also be undefined, but there is a default path set
// in sproxydclient as well
path: locationObj.details.connector.sproxyd.path,
// enable immutable optim for all objects
immutable: true,
});
clients[location].clientType = 'scality';
}
if (locationObj.type === 'aws_s3' || locationObj.type === 'gcp') {
if (process.env.CI_PROXY === 'true') {
locationObj.details.https = false;
locationObj.details.proxy = proxyAddress;
}
const connectionAgent = locationObj.details.https ?
new https.Agent({ keepAlive: true }) :
new http.Agent({ keepAlive: true });
const protocol = locationObj.details.https ? 'https' : 'http';
const httpOptions = locationObj.details.proxy ?
{ proxy: locationObj.details.proxy, agent: connectionAgent,
timeout: 0 }
: { agent: connectionAgent, timeout: 0 };
const sslEnabled = locationObj.details.https === true;
// TODO: HTTP requests to AWS are not supported with V4 auth for
// non-file streams which are used by Backbeat. This option will be
// removed once CA certs, proxy setup feature is implemented.
const signatureVersion = !sslEnabled ? 'v2' : 'v4';
const endpoint = locationObj.type === 'gcp' ?
locationObj.details.gcpEndpoint :
locationObj.details.awsEndpoint;
const s3Params = {
endpoint: `${protocol}://${endpoint}`,
debug: false,
// Not implemented yet for streams in node sdk,
// and has no negative impact if stream, so let's
// leave it in for future use
computeChecksums: true,
httpOptions,
// needed for encryption
signatureVersion,
sslEnabled,
maxRetries: 0,
};
// users can either include the desired profile name from their
// ~/.aws/credentials file or include the accessKeyId and
// secretAccessKey directly in the locationConfig
if (locationObj.details.credentialsProfile) {
s3Params.credentials = new AWS.SharedIniFileCredentials({
profile: locationObj.details.credentialsProfile });
} else {
s3Params.accessKeyId =
locationObj.details.credentials.accessKey;
s3Params.secretAccessKey =
locationObj.details.credentials.secretKey;
}
const clientConfig = {
s3Params,
bucketName: locationObj.details.bucketName,
bucketMatch: locationObj.details.bucketMatch,
serverSideEncryption: locationObj.details.serverSideEncryption,
dataStoreName: location,
};
if (locationObj.type === 'gcp') {
clientConfig.overflowBucket =
locationObj.details.overflowBucketName;
clientConfig.mpuBucket = locationObj.details.mpuBucketName;
clientConfig.authParams = config.getGcpServiceParams(location);
}
clients[location] = locationObj.type === 'gcp' ?
new GcpClient(clientConfig) : new AwsClient(clientConfig);
}
if (locationObj.type === 'azure') {
if (process.env.CI_PROXY === 'true') {
locationObj.details.proxy = proxyAddress;
}
const azureStorageEndpoint = config.getAzureEndpoint(location);
const azureStorageCredentials =
config.getAzureStorageCredentials(location);
clients[location] = new AzureClient({
azureStorageEndpoint,
azureStorageCredentials,
azureContainerName: locationObj.details.azureContainerName,
bucketMatch: locationObj.details.bucketMatch,
dataStoreName: location,
proxy: locationObj.details.proxy,
});
clients[location].clientType = 'azure';
}
});
return clients;
}
module.exports = parseLC;

View File

@ -1,306 +0,0 @@
const { errors, s3middleware } = require('arsenal');
const { parseTagFromQuery } = s3middleware.tagging;
const createLogger = require('./multipleBackendLogger');
const async = require('async');
const { config } = require('../Config');
const parseLC = require('./locationConstraintParser');
const { checkExternalBackend } = require('./external/utils');
const { externalBackendHealthCheckInterval } = require('../../constants');
let clients = parseLC(config);
config.on('location-constraints-update', () => {
clients = parseLC(config);
});
const multipleBackendGateway = {
put: (hashedStream, size, keyContext,
backendInfo, reqUids, callback) => {
const controllingLocationConstraint =
backendInfo.getControllingLocationConstraint();
const client = clients[controllingLocationConstraint];
if (!client) {
const log = createLogger(reqUids);
log.error('no data backend matching controlling locationConstraint',
{ controllingLocationConstraint });
return process.nextTick(() => {
callback(errors.InternalError);
});
}
let writeStream = hashedStream;
if (keyContext.cipherBundle && keyContext.cipherBundle.cipher) {
writeStream = keyContext.cipherBundle.cipher;
hashedStream.pipe(writeStream);
}
if (keyContext.tagging) {
const validationTagRes = parseTagFromQuery(keyContext.tagging);
if (validationTagRes instanceof Error) {
const log = createLogger(reqUids);
log.debug('tag validation failed', {
error: validationTagRes,
method: 'multipleBackendGateway put',
});
return callback(errors.InternalError);
}
}
return client.put(writeStream, size, keyContext,
reqUids, (err, key, dataStoreVersionId) => {
const log = createLogger(reqUids);
log.debug('put to location', { controllingLocationConstraint });
if (err) {
log.error('error from datastore',
{ error: err, dataStoreType: client.clientType });
return callback(errors.ServiceUnavailable);
}
const dataRetrievalInfo = {
key,
dataStoreName: controllingLocationConstraint,
dataStoreType: client.clientType,
dataStoreVersionId,
};
return callback(null, dataRetrievalInfo);
});
},
head: (objectGetInfoArr, reqUids, callback) => {
if (!objectGetInfoArr || !Array.isArray(objectGetInfoArr)
|| !objectGetInfoArr[0] || !objectGetInfoArr[0].dataStoreName) {
// no-op if no stored data store name
return process.nextTick(callback);
}
const objectGetInfo = objectGetInfoArr[0];
const client = clients[objectGetInfo.dataStoreName];
if (client.head === undefined) {
// no-op if unsupported client method
return process.nextTick(callback);
}
return client.head(objectGetInfo, reqUids, callback);
},
get: (objectGetInfo, range, reqUids, callback) => {
let key;
let client;
// for backwards compatibility
if (typeof objectGetInfo === 'string') {
key = objectGetInfo;
client = clients.sproxyd;
} else {
key = objectGetInfo.key;
client = clients[objectGetInfo.dataStoreName];
}
if (client.clientType === 'scality') {
return client.get(key, range, reqUids, callback);
}
return client.get(objectGetInfo, range, reqUids, callback);
},
delete: (objectGetInfo, reqUids, callback) => {
let key;
let client;
// for backwards compatibility
if (typeof objectGetInfo === 'string') {
key = objectGetInfo;
client = clients.sproxyd;
} else {
key = objectGetInfo.key;
client = clients[objectGetInfo.dataStoreName];
}
if (client.clientType === 'scality') {
return client.delete(key, reqUids, callback);
}
return client.delete(objectGetInfo, reqUids, callback);
},
batchDelete: (dataStoreName, keys, log, callback) => {
const client = clients[dataStoreName];
if (client.batchDelete) {
log.debug('submitting keys for batch delete', { keys });
return client.batchDelete(keys, log.getSerializedUids(), callback);
}
return callback(errors.NotImplemented);
},
healthcheck: (flightCheckOnStartUp, log, callback) => {
const multBackendResp = {};
const awsArray = [];
const azureArray = [];
async.each(Object.keys(clients), (location, cb) => {
const client = clients[location];
if (client.clientType === 'scality') {
return client.healthcheck(log, (err, res) => {
if (err) {
multBackendResp[location] = { error: err };
} else {
multBackendResp[location] = { code: res.statusCode,
message: res.statusMessage };
}
return cb();
});
} else if (client.clientType === 'aws_s3') {
awsArray.push(location);
return cb();
} else if (client.clientType === 'azure') {
azureArray.push(location);
return cb();
}
// if backend type isn't 'scality' or 'aws_s3', it will be
// 'mem' or 'file', for which the default response is 200 OK
multBackendResp[location] = { code: 200, message: 'OK' };
return cb();
}, () => {
async.parallel([
next => checkExternalBackend(
clients, awsArray, 'aws_s3', flightCheckOnStartUp,
externalBackendHealthCheckInterval, next),
next => checkExternalBackend(
clients, azureArray, 'azure', flightCheckOnStartUp,
externalBackendHealthCheckInterval, next),
], (errNull, externalResp) => {
const externalLocResults = [];
externalResp.forEach(resp => externalLocResults.push(...resp));
externalLocResults.forEach(locationResult =>
Object.assign(multBackendResp, locationResult));
callback(null, multBackendResp);
});
});
},
createMPU: (key, metaHeaders, bucketName, websiteRedirectHeader,
location, contentType, cacheControl, contentDisposition,
contentEncoding, tagging, log, cb) => {
const client = clients[location];
if (client.clientType === 'aws_s3') {
return client.createMPU(key, metaHeaders, bucketName,
websiteRedirectHeader, contentType, cacheControl,
contentDisposition, contentEncoding, tagging, log, cb);
}
return cb();
},
isClientHandleMpu: location => {
const client = clients[location];
if (client.uploadPart) {
return true;
}
return false;
},
uploadPart: (request, streamingV4Params, stream, size, location, key,
uploadId, partNumber, bucketName, log, cb) => {
const client = clients[location];
if (client.uploadPart) {
return client.uploadPart(request, streamingV4Params, stream, size,
key, uploadId, partNumber, bucketName, log, cb);
}
return cb();
},
listParts: (key, uploadId, location, bucketName, partNumberMarker, maxParts,
log, cb) => {
const client = clients[location];
if (client.listParts) {
return client.listParts(key, uploadId, bucketName, partNumberMarker,
maxParts, log, cb);
}
return cb();
},
completeMPU: (key, uploadId, location, jsonList, mdInfo, bucketName,
userMetadata, contentSettings, log, cb) => {
const client = clients[location];
if (client.completeMPU) {
const args = [jsonList, mdInfo, key, uploadId, bucketName];
if (client.clientType === 'azure') {
args.push(userMetadata, contentSettings);
}
return client.completeMPU(...args, log, (err, completeObjData) => {
if (err) {
return cb(err);
}
// eslint-disable-next-line no-param-reassign
completeObjData.dataStoreType = client.clientType;
return cb(null, completeObjData);
});
}
return cb();
},
abortMPU: (key, uploadId, location, bucketName, log, cb) => {
const client = clients[location];
if (client.clientType === 'azure') {
const skipDataDelete = true;
return cb(null, skipDataDelete);
}
if (client.abortMPU) {
return client.abortMPU(key, uploadId, bucketName, log, err => {
if (err) {
return cb(err);
}
return cb();
});
}
return cb();
},
objectTagging: (method, key, bucket, objectMD, log, cb) => {
// if legacy, objectMD will not contain dataStoreName, so just return
const client = clients[objectMD.dataStoreName];
if (client && client[`object${method}Tagging`]) {
return client[`object${method}Tagging`](key, bucket, objectMD, log,
cb);
}
return cb();
},
// NOTE: using copyObject only if copying object from one external
// backend to the same external backend
copyObject: (request, destLocationConstraintName, externalSourceKey,
sourceLocationConstraintName, storeMetadataParams, log, cb) => {
const client = clients[destLocationConstraintName];
if (client.copyObject) {
return client.copyObject(request, destLocationConstraintName,
externalSourceKey, sourceLocationConstraintName,
storeMetadataParams, log, (err, key, dataStoreVersionId) => {
const dataRetrievalInfo = {
key,
dataStoreName: destLocationConstraintName,
dataStoreType: client.clientType,
dataStoreVersionId,
};
cb(err, dataRetrievalInfo);
});
}
return cb(errors.NotImplemented
.customizeDescription('Can not copy object from ' +
`${client.clientType} to ${client.clientType}`));
},
uploadPartCopy: (request, location, awsSourceKey,
sourceLocationConstraintName, log, cb) => {
const client = clients[location];
if (client.uploadPartCopy) {
return client.uploadPartCopy(request, awsSourceKey,
sourceLocationConstraintName,
log, cb);
}
return cb(errors.NotImplemented.customizeDescription(
'Can not copy object from ' +
`${client.clientType} to ${client.clientType}`));
},
protectAzureBlocks: (bucketName, objectKey, location, log, cb) => {
const client = clients[location];
if (client.protectAzureBlocks) {
return client.protectAzureBlocks(bucketName, objectKey, location,
log, cb);
}
return cb();
},
};
module.exports = multipleBackendGateway;

View File

@ -1,11 +0,0 @@
const werelogs = require('werelogs');
const logger = new werelogs.Logger('MultipleBackendGateway');
function createLogger(reqUids) {
return reqUids ?
logger.newRequestLoggerFromSerializedUids(reqUids) :
logger.newRequestLogger();
}
module.exports = createLogger;

View File

@ -1,26 +1,18 @@
const async = require('async');
const { errors, s3middleware } = require('arsenal');
const PassThrough = require('stream').PassThrough;
const { storage } = require('arsenal');
const DataFileInterface = require('./file/backend');
const inMemory = require('./in_memory/backend').backend;
const locationConstraintCheck =
require('../api/apiUtils/object/locationConstraintCheck');
const multipleBackendGateway = require('./multipleBackendGateway');
const utils = require('./external/utils');
const { config } = require('../Config');
const MD5Sum = s3middleware.MD5Sum;
const NullStream = s3middleware.NullStream;
const assert = require('assert');
const kms = require('../kms/wrapper');
const externalBackends = require('../../constants').externalBackends;
const constants = require('../../constants');
const { BackendInfo } = require('../api/apiUtils/object/BackendInfo');
const RelayMD5Sum = require('../utilities/RelayMD5Sum');
const skipError = new Error('skip');
const metadata = require('../metadata/wrapper');
const vault = require('../auth/vault');
const locationStorageCheck =
require('../api/apiUtils/object/locationStorageCheck');
const { DataWrapper, MultipleBackendGateway, parseLC } = storage.data;
const { DataFileInterface } = storage.data.file;
const inMemory = storage.data.inMemory.datastore.backend;
let CdmiData;
try {
// eslint-disable-next-line import/no-unresolved
CdmiData = require('cdmiclient').CdmiData;
} catch (err) {
CdmiData = null;
@ -33,10 +25,12 @@ if (config.backends.data === 'mem') {
client = inMemory;
implName = 'mem';
} else if (config.backends.data === 'file') {
client = new DataFileInterface();
client = new DataFileInterface(config);
implName = 'file';
} else if (config.backends.data === 'multiple') {
client = multipleBackendGateway;
const clients = parseLC(config, vault);
client = new MultipleBackendGateway(
clients, metadata, locationStorageCheck);
implName = 'multipleBackends';
} else if (config.backends.data === 'cdmi') {
if (!CdmiData) {
@ -52,780 +46,16 @@ if (config.backends.data === 'mem') {
implName = 'cdmi';
}
/**
* _retryDelete - Attempt to delete key again if it failed previously
* @param { string | object } objectGetInfo - either string location of object
* to delete or object containing info of object to delete
* @param {object} log - Werelogs request logger
* @param {number} count - keeps count of number of times function has been run
* @param {function} cb - callback
* @returns undefined and calls callback
*/
const MAX_RETRY = 2;
// This check is done because on a put, complete mpu or copy request to
// Azure/AWS, if the object already exists on that backend, the existing object
// should not be deleted, which is the functionality for all other backends
function _shouldSkipDelete(locations, requestMethod, newObjDataStoreName) {
const skipMethods = { PUT: true, POST: true };
if (!Array.isArray(locations) || !locations[0] ||
!locations[0].dataStoreType) {
return false;
}
const isSkipBackend = externalBackends[locations[0].dataStoreType];
const isMatchingBackends =
locations[0].dataStoreName === newObjDataStoreName;
const isSkipMethod = skipMethods[requestMethod];
return (isSkipBackend && isMatchingBackends && isSkipMethod);
}
function _retryDelete(objectGetInfo, log, count, cb) {
if (count > MAX_RETRY) {
return cb(errors.InternalError);
}
return client.delete(objectGetInfo, log.getSerializedUids(), err => {
if (err) {
if (err.ObjNotFound) {
log.info('no such key in datastore',
{ objectGetInfo, implName, moreRetries: 'no' });
return cb(err);
}
log.error('delete error from datastore',
{ error: err, implName, moreRetries: 'yes' });
return _retryDelete(objectGetInfo, log, count + 1, cb);
}
return cb();
});
}
function _put(cipherBundle, value, valueSize,
keyContext, backendInfo, log, cb) {
assert.strictEqual(typeof valueSize, 'number');
log.debug('sending put to datastore', { implName, keyContext,
method: 'put' });
let hashedStream = null;
if (value) {
hashedStream = new MD5Sum();
value.pipe(hashedStream);
value.once('clientError', () => {
log.trace('destroying hashed stream');
hashedStream.destroy();
});
}
const data = new DataWrapper(
client, implName, config, kms, metadata, locationStorageCheck, vault);
config.on('location-constraints-update', () => {
if (implName === 'multipleBackends') {
// Need to send backendInfo to client.put and
// client.put will provide dataRetrievalInfo so no
// need to construct here
/* eslint-disable no-param-reassign */
keyContext.cipherBundle = cipherBundle;
return client.put(hashedStream,
valueSize, keyContext, backendInfo, log.getSerializedUids(),
(err, dataRetrievalInfo) => {
if (err) {
log.error('put error from datastore',
{ error: err, implName });
if (err.httpCode === 408) {
return cb(errors.IncompleteBody);
const clients = parseLC(config, vault);
client = new MultipleBackendGateway(
clients, metadata, locationStorageCheck);
data.switch(client);
}
return cb(errors.ServiceUnavailable);
}
return cb(null, dataRetrievalInfo, hashedStream);
});
}
/* eslint-enable no-param-reassign */
});
let writeStream = hashedStream;
if (cipherBundle && cipherBundle.cipher) {
writeStream = cipherBundle.cipher;
hashedStream.pipe(writeStream);
}
return client.put(writeStream, valueSize, keyContext,
log.getSerializedUids(), (err, key) => {
if (err) {
log.error('put error from datastore',
{ error: err, implName });
if (err.httpCode === 408) {
return cb(errors.IncompleteBody);
}
return cb(errors.InternalError);
}
const dataRetrievalInfo = {
key,
dataStoreName: implName,
};
return cb(null, dataRetrievalInfo, hashedStream);
});
}
const data = {
put: (cipherBundle, value, valueSize, keyContext, backendInfo, log, cb) => {
_put(cipherBundle, value, valueSize, keyContext, backendInfo, log,
(err, dataRetrievalInfo, hashedStream) => {
if (err) {
return cb(err);
}
if (hashedStream) {
if (hashedStream.completedHash) {
return cb(null, dataRetrievalInfo, hashedStream);
}
hashedStream.on('hashed', () => {
hashedStream.removeAllListeners('hashed');
return cb(null, dataRetrievalInfo, hashedStream);
});
return undefined;
}
return cb(null, dataRetrievalInfo);
});
},
head: (objectGetInfo, log, cb) => {
if (implName !== 'multipleBackends') {
// no-op if not multipleBackend implementation;
// head is used during get just to check external backend data state
return process.nextTick(cb);
}
return client.head(objectGetInfo, log.getSerializedUids(), cb);
},
get: (objectGetInfo, response, log, cb) => {
const isMdModelVersion2 = typeof(objectGetInfo) === 'string';
const isRequiredStringKey = constants.clientsRequireStringKey[implName];
const key = isMdModelVersion2 ? objectGetInfo : objectGetInfo.key;
const clientGetInfo = isRequiredStringKey ? key : objectGetInfo;
const range = objectGetInfo.range;
// If the key is explicitly set to null, the part to
// be read doesn't really exist and is only made of zeroes.
// This functionality is used by Scality-NFSD.
// Otherwise, the key is always defined
assert(key === null || key !== undefined);
if (key === null) {
cb(null, new NullStream(objectGetInfo.size, range));
return;
}
log.debug('sending get to datastore', { implName,
key, range, method: 'get' });
// We need to use response as a writable stream for AZURE GET
if (!isMdModelVersion2 && !isRequiredStringKey && response) {
clientGetInfo.response = response;
}
client.get(clientGetInfo, range, log.getSerializedUids(),
(err, stream) => {
if (err) {
log.error('get error from datastore',
{ error: err, implName });
return cb(errors.ServiceUnavailable);
}
if (objectGetInfo.cipheredDataKey) {
const serverSideEncryption = {
cryptoScheme: objectGetInfo.cryptoScheme,
masterKeyId: objectGetInfo.masterKeyId,
cipheredDataKey: Buffer.from(
objectGetInfo.cipheredDataKey, 'base64'),
};
const offset = objectGetInfo.range ?
objectGetInfo.range[0] : 0;
return kms.createDecipherBundle(
serverSideEncryption, offset, log,
(err, decipherBundle) => {
if (err) {
log.error('cannot get decipher bundle ' +
'from kms', {
method: 'data.wrapper.data.get',
});
return cb(err);
}
stream.pipe(decipherBundle.decipher);
return cb(null, decipherBundle.decipher);
});
}
return cb(null, stream);
});
},
delete: (objectGetInfo, log, cb) => {
const callback = cb || log.end;
const isMdModelVersion2 = typeof(objectGetInfo) === 'string';
const isRequiredStringKey = constants.clientsRequireStringKey[implName];
const key = isMdModelVersion2 ? objectGetInfo : objectGetInfo.key;
const clientGetInfo = isRequiredStringKey ? key : objectGetInfo;
log.trace('sending delete to datastore', {
implName, key, method: 'delete' });
// If the key is explicitly set to null, the part to
// be deleted doesn't really exist.
// This functionality is used by Scality-NFSD.
// Otherwise, the key is always defined
assert(key === null || key !== undefined);
if (key === null) {
callback(null);
return;
}
_retryDelete(clientGetInfo, log, 0, err => {
if (err && !err.ObjNotFound) {
log.error('delete error from datastore',
{ error: err, key: objectGetInfo.key, moreRetries: 'no' });
}
return callback(err);
});
},
batchDelete: (locations, requestMethod, newObjDataStoreName, log, cb) => {
// TODO: The method of persistence of sproxy delete key will
// be finalized; refer Issue #312 for the discussion. In the
// meantime, we at least log the location of the data we are
// about to delete before attempting its deletion.
if (_shouldSkipDelete(locations, requestMethod, newObjDataStoreName)) {
return process.nextTick(cb);
}
log.trace('initiating batch delete', {
keys: locations,
implName,
method: 'batchDelete',
});
const keys = [];
let backendName = '';
const shouldBatchDelete = locations.every(l => {
// legacy sproxyd location, should fallback to using regular delete
if (typeof l === 'string') {
return false;
}
const { dataStoreName, key } = l;
backendName = dataStoreName;
const type = config.getLocationConstraintType(dataStoreName);
// filter out possible `null` created by NFS
if (key && type === 'scality') {
keys.push(key);
return true;
}
return false;
});
if (shouldBatchDelete) {
return client.batchDelete(backendName, { keys }, log, cb);
}
return async.eachLimit(locations, 5, (loc, next) => {
process.nextTick(() => data.delete(loc, log, next));
},
err => {
if (err) {
log.end().error('batch delete failed', { error: err });
// deletion of non-existing objects result in 204
if (err.code === 404) {
return cb();
}
return cb(err);
}
log.end().trace('batch delete successfully completed');
return cb();
});
},
switch: newClient => {
client = newClient;
return client;
},
checkHealth: (log, cb, flightCheckOnStartUp) => {
if (!client.healthcheck) {
const defResp = {};
defResp[implName] = { code: 200, message: 'OK' };
return cb(null, defResp);
}
return client.healthcheck(flightCheckOnStartUp, log, (err, result) => {
let respBody = {};
if (err) {
log.error(`error from ${implName}`, { error: err });
respBody[implName] = {
error: err,
};
// error returned as null so async parallel doesn't return
// before all backends are checked
return cb(null, respBody);
}
if (implName === 'multipleBackends') {
respBody = result;
return cb(null, respBody);
}
respBody[implName] = {
code: result.statusCode,
message: result.statusMessage,
};
return cb(null, respBody);
});
},
getDiskUsage: (log, cb) => {
if (!client.getDiskUsage) {
log.debug('returning empty disk usage as fallback', { implName });
return cb(null, {});
}
return client.getDiskUsage(log.getSerializedUids(), cb);
},
/**
* _putForCopy - put used for copying object
* @param {object} cipherBundle - cipher bundle that encrypt the data
* @param {object} stream - stream containing the data
* @param {object} part - element of dataLocator array
* @param {object} dataStoreContext - information of the
* destination object
* dataStoreContext.bucketName: destination bucket name,
* dataStoreContext.owner: owner,
* dataStoreContext.namespace: request namespace,
* dataStoreContext.objectKey: destination object key name,
* @param {BackendInfo} destBackendInfo - Instance of BackendInfo:
* Represents the info necessary to evaluate which data backend to use
* on a data put call.
* @param {object} log - Werelogs request logger
* @param {function} cb - callback
* @returns {function} cb - callback
*/
_putForCopy: (cipherBundle, stream, part, dataStoreContext,
destBackendInfo, log, cb) => data.put(cipherBundle, stream,
part.size, dataStoreContext,
destBackendInfo, log,
(error, partRetrievalInfo) => {
if (error) {
return cb(error);
}
const partResult = {
key: partRetrievalInfo.key,
dataStoreName: partRetrievalInfo
.dataStoreName,
dataStoreType: partRetrievalInfo
.dataStoreType,
start: part.start,
size: part.size,
};
if (cipherBundle) {
partResult.cryptoScheme = cipherBundle.cryptoScheme;
partResult.cipheredDataKey = cipherBundle.cipheredDataKey;
}
if (part.dataStoreETag) {
partResult.dataStoreETag = part.dataStoreETag;
}
if (partRetrievalInfo.dataStoreVersionId) {
partResult.dataStoreVersionId =
partRetrievalInfo.dataStoreVersionId;
}
return cb(null, partResult);
}),
/**
* _dataCopyPut - put used for copying object with and without
* encryption
* @param {string} serverSideEncryption - Server side encryption
* @param {object} stream - stream containing the data
* @param {object} part - element of dataLocator array
* @param {object} dataStoreContext - information of the
* destination object
* dataStoreContext.bucketName: destination bucket name,
* dataStoreContext.owner: owner,
* dataStoreContext.namespace: request namespace,
* dataStoreContext.objectKey: destination object key name,
* @param {BackendInfo} destBackendInfo - Instance of BackendInfo:
* Represents the info necessary to evaluate which data backend to use
* on a data put call.
* @param {object} log - Werelogs request logger
* @param {function} cb - callback
* @returns {function} cb - callback
*/
_dataCopyPut: (serverSideEncryption, stream, part, dataStoreContext,
destBackendInfo, log, cb) => {
if (serverSideEncryption) {
return kms.createCipherBundle(
serverSideEncryption,
log, (err, cipherBundle) => {
if (err) {
log.debug('error getting cipherBundle');
return cb(errors.InternalError);
}
return data._putForCopy(cipherBundle, stream, part,
dataStoreContext, destBackendInfo, log, cb);
});
}
// Copied object is not encrypted so just put it
// without a cipherBundle
return data._putForCopy(null, stream, part, dataStoreContext,
destBackendInfo, log, cb);
},
/**
* copyObject - copy object
* @param {object} request - request object
* @param {string} sourceLocationConstraintName -
* source locationContraint name (awsbackend, azurebackend, ...)
* @param {object} storeMetadataParams - metadata information of the
* source object
* @param {array} dataLocator - source object metadata location(s)
* NOTE: for Azure and AWS data backend this array only has one item
* @param {object} dataStoreContext - information of the
* destination object
* dataStoreContext.bucketName: destination bucket name,
* dataStoreContext.owner: owner,
* dataStoreContext.namespace: request namespace,
* dataStoreContext.objectKey: destination object key name,
* @param {BackendInfo} destBackendInfo - Instance of BackendInfo:
* Represents the info necessary to evaluate which data backend to use
* on a data put call.
* @param {object} sourceBucketMD - metadata of the source bucket
* @param {object} destBucketMD - metadata of the destination bucket
* @param {object} serverSideEncryption - server side encryption configuration
* @param {object} log - Werelogs request logger
* @param {function} cb - callback
* @returns {function} cb - callback
*/
copyObject: (request,
sourceLocationConstraintName, storeMetadataParams, dataLocator,
dataStoreContext, destBackendInfo, sourceBucketMD, destBucketMD,
serverSideEncryption, log, cb) => {
if (config.backends.data === 'multiple' &&
utils.externalBackendCopy(sourceLocationConstraintName,
storeMetadataParams.dataStoreName, sourceBucketMD, destBucketMD)
&& serverSideEncryption === null) {
const destLocationConstraintName =
storeMetadataParams.dataStoreName;
const objectGetInfo = dataLocator[0];
const externalSourceKey = objectGetInfo.key;
return client.copyObject(request, destLocationConstraintName,
externalSourceKey, sourceLocationConstraintName,
storeMetadataParams, log, (error, objectRetrievalInfo) => {
if (error) {
return cb(error);
}
const putResult = {
key: objectRetrievalInfo.key,
dataStoreName: objectRetrievalInfo.
dataStoreName,
dataStoreType: objectRetrievalInfo.
dataStoreType,
dataStoreVersionId:
objectRetrievalInfo.dataStoreVersionId,
size: storeMetadataParams.size,
dataStoreETag: objectGetInfo.dataStoreETag,
start: objectGetInfo.start,
};
const putResultArr = [putResult];
return cb(null, putResultArr);
});
}
// dataLocator is an array. need to get and put all parts
// For now, copy 1 part at a time. Could increase the second
// argument here to increase the number of parts
// copied at once.
return async.mapLimit(dataLocator, 1,
// eslint-disable-next-line prefer-arrow-callback
function copyPart(part, copyCb) {
if (part.dataStoreType === 'azure') {
const passThrough = new PassThrough();
return async.parallel([
parallelCb => data.get(part, passThrough, log, err =>
parallelCb(err)),
parallelCb => data._dataCopyPut(serverSideEncryption,
passThrough,
part, dataStoreContext, destBackendInfo, log,
parallelCb),
], (err, res) => {
if (err) {
return copyCb(err);
}
return copyCb(null, res[1]);
});
}
return data.get(part, null, log, (err, stream) => {
if (err) {
return copyCb(err);
}
return data._dataCopyPut(serverSideEncryption, stream,
part, dataStoreContext, destBackendInfo, log, copyCb);
});
}, (err, results) => {
if (err) {
log.debug('error transferring data from source',
{ error: err });
return cb(err);
}
return cb(null, results);
});
},
_dataCopyPutPart: (request,
serverSideEncryption, stream, part,
dataStoreContext, destBackendInfo, locations, log, cb) => {
const numberPartSize =
Number.parseInt(part.size, 10);
const partNumber = Number.parseInt(request.query.partNumber, 10);
const uploadId = request.query.uploadId;
const destObjectKey = request.objectKey;
const destBucketName = request.bucketName;
const destLocationConstraintName = destBackendInfo
.getControllingLocationConstraint();
if (externalBackends[config
.locationConstraints[destLocationConstraintName]
.type]) {
return multipleBackendGateway.uploadPart(null, null,
stream, numberPartSize,
destLocationConstraintName, destObjectKey, uploadId,
partNumber, destBucketName, log,
(err, partInfo) => {
if (err) {
log.error('error putting ' +
'part to AWS', {
error: err,
method:
'objectPutCopyPart::' +
'multipleBackendGateway.' +
'uploadPart',
});
return cb(errors.ServiceUnavailable);
}
// skip to end of waterfall
// because don't need to store
// part metadata
if (partInfo &&
partInfo.dataStoreType === 'aws_s3') {
// if data backend handles MPU, skip to end
// of waterfall
const partResult = {
dataStoreETag: partInfo.dataStoreETag,
};
locations.push(partResult);
return cb(skipError, partInfo.dataStoreETag);
} else if (
partInfo &&
partInfo.dataStoreType === 'azure') {
const partResult = {
key: partInfo.key,
dataStoreName: partInfo.dataStoreName,
dataStoreETag: partInfo.dataStoreETag,
size: numberPartSize,
numberSubParts:
partInfo.numberSubParts,
partNumber: partInfo.partNumber,
};
locations.push(partResult);
return cb();
}
return cb(skipError);
});
}
if (serverSideEncryption) {
return kms.createCipherBundle(
serverSideEncryption,
log, (err, cipherBundle) => {
if (err) {
log.debug('error getting cipherBundle',
{ error: err });
return cb(errors.InternalError);
}
return data.put(cipherBundle, stream,
numberPartSize, dataStoreContext,
destBackendInfo, log,
(error, partRetrievalInfo,
hashedStream) => {
if (error) {
log.debug('error putting ' +
'encrypted part', { error });
return cb(error);
}
const partResult = {
key: partRetrievalInfo.key,
dataStoreName: partRetrievalInfo
.dataStoreName,
dataStoreETag: hashedStream
.completedHash,
// Do not include part start
// here since will change in
// final MPU object
size: numberPartSize,
sseCryptoScheme: cipherBundle
.cryptoScheme,
sseCipheredDataKey: cipherBundle
.cipheredDataKey,
sseAlgorithm: cipherBundle
.algorithm,
sseMasterKeyId: cipherBundle
.masterKeyId,
};
locations.push(partResult);
return cb();
});
});
}
// Copied object is not encrypted so just put it
// without a cipherBundle
return data.put(null, stream, numberPartSize,
dataStoreContext, destBackendInfo,
log, (error, partRetrievalInfo, hashedStream) => {
if (error) {
log.debug('error putting object part',
{ error });
return cb(error);
}
const partResult = {
key: partRetrievalInfo.key,
dataStoreName: partRetrievalInfo.dataStoreName,
dataStoreETag: hashedStream.completedHash,
size: numberPartSize,
};
locations.push(partResult);
return cb();
});
},
/**
* uploadPartCopy - put copy part
* @param {object} request - request object
* @param {object} log - Werelogs request logger
* @param {object} destBucketMD - destination bucket metadata
* @param {string} sourceLocationConstraintName -
* source locationContraint name (awsbackend, azurebackend, ...)
* @param {string} destLocationConstraintName -
* location of the destination MPU object (awsbackend, azurebackend, ...)
* @param {array} dataLocator - source object metadata location(s)
* NOTE: for Azure and AWS data backend this array
* @param {object} dataStoreContext - information of the
* destination object
* dataStoreContext.bucketName: destination bucket name,
* dataStoreContext.owner: owner,
* dataStoreContext.namespace: request namespace,
* dataStoreContext.objectKey: destination object key name,
* dataStoreContext.uploadId: uploadId
* dataStoreContext.partNumber: request.query.partNumber
* @param {function} callback - callback
* @returns {function} cb - callback
*/
uploadPartCopy: (request, log, destBucketMD, sourceLocationConstraintName,
destLocationConstraintName, dataLocator, dataStoreContext,
callback) => {
const serverSideEncryption = destBucketMD.getServerSideEncryption();
const lastModified = new Date().toJSON();
// skip if 0 byte object
if (dataLocator.length === 0) {
return process.nextTick(() => {
callback(null, constants.emptyFileMd5,
lastModified, serverSideEncryption, []);
});
}
// if destination mpu was initiated in legacy version
if (destLocationConstraintName === undefined) {
const backendInfoObj = locationConstraintCheck(request,
null, destBucketMD, log);
if (backendInfoObj.err) {
return process.nextTick(() => {
callback(backendInfoObj.err);
});
}
// eslint-disable-next-line no-param-reassign
destLocationConstraintName = backendInfoObj.controllingLC;
}
const locationTypeMatchAWS =
config.backends.data === 'multiple' &&
config.getLocationConstraintType(sourceLocationConstraintName) ===
config.getLocationConstraintType(destLocationConstraintName) &&
config.getLocationConstraintType(sourceLocationConstraintName) ===
'aws_s3';
// NOTE: using multipleBackendGateway.uploadPartCopy only if copying
// from AWS to AWS
if (locationTypeMatchAWS && dataLocator.length === 1) {
const awsSourceKey = dataLocator[0].key;
return multipleBackendGateway.uploadPartCopy(request,
destLocationConstraintName, awsSourceKey,
sourceLocationConstraintName, log, (error, eTag) => {
if (error) {
return callback(error);
}
return callback(skipError, eTag,
lastModified, serverSideEncryption);
});
}
const backendInfo = new BackendInfo(destLocationConstraintName);
// totalHash will be sent through the RelayMD5Sum transform streams
// to collect the md5 from multiple streams
let totalHash;
const locations = [];
// dataLocator is an array. need to get and put all parts
// in order so can get the ETag of full object
return async.forEachOfSeries(dataLocator,
// eslint-disable-next-line prefer-arrow-callback
function copyPart(part, index, cb) {
if (part.dataStoreType === 'azure') {
const passThrough = new PassThrough();
return async.parallel([
next => data.get(part, passThrough, log, err => {
if (err) {
log.error('error getting data part ' +
'from Azure', {
error: err,
method:
'objectPutCopyPart::' +
'multipleBackendGateway.' +
'copyPart',
});
return next(err);
}
return next();
}),
next => data._dataCopyPutPart(request,
serverSideEncryption, passThrough, part,
dataStoreContext, backendInfo, locations, log, next),
], err => {
if (err) {
return cb(err);
}
return cb();
});
}
return data.get(part, null, log, (err, stream) => {
if (err) {
log.debug('error getting object part',
{ error: err });
return cb(err);
}
const hashedStream =
new RelayMD5Sum(totalHash, updatedHash => {
totalHash = updatedHash;
});
stream.pipe(hashedStream);
// destLocationConstraintName is location of the
// destination MPU object
return data._dataCopyPutPart(request,
serverSideEncryption, hashedStream, part,
dataStoreContext, backendInfo, locations, log, cb);
});
}, err => {
// Digest the final combination of all of the part streams
if (err && err !== skipError) {
log.debug('error transferring data from source',
{ error: err, method: 'goGetData' });
return callback(err);
}
if (totalHash) {
totalHash = totalHash.digest('hex');
} else {
totalHash = locations[0].dataStoreETag;
}
if (err && err === skipError) {
return callback(skipError, totalHash,
lastModified, serverSideEncryption);
}
return callback(null, totalHash,
lastModified, serverSideEncryption, locations);
});
},
};
module.exports = data;
module.exports = { data, client, implName };

View File

@ -1,6 +1,5 @@
const { errors } = require('arsenal');
const getReplicationInfo = require('../api/apiUtils/object/getReplicationInfo');
const aclUtils = require('../utilities/aclUtils');
const constants = require('../../constants');
const metadata = require('../metadata/wrapper');
@ -17,12 +16,6 @@ const acl = {
log.trace('updating object acl in metadata');
// eslint-disable-next-line no-param-reassign
objectMD.acl = addACLParams;
const replicationInfo = getReplicationInfo(objectKey, bucket, true);
if (replicationInfo) {
// eslint-disable-next-line no-param-reassign
objectMD.replicationInfo = Object.assign({},
objectMD.replicationInfo, replicationInfo);
}
metadata.putObjectMD(bucket.getName(), objectKey, objectMD, params, log,
cb);
},

View File

@ -1,213 +0,0 @@
const assert = require('assert');
const bucketclient = require('bucketclient');
const BucketInfo = require('arsenal').models.BucketInfo;
const logger = require('../../utilities/logger');
const { config } = require('../../Config');
class BucketClientInterface {
constructor() {
assert(config.bucketd.bootstrap.length > 0,
'bucketd bootstrap list is empty');
const { bootstrap, log } = config.bucketd;
if (config.https) {
const { key, cert, ca } = config.https;
logger.info('bucketclient configuration', {
bootstrap,
log,
https: true,
});
this.client = new bucketclient.RESTClient(bootstrap, log, true,
key, cert, ca);
} else {
logger.info('bucketclient configuration', {
bootstrap,
log,
https: false,
});
this.client = new bucketclient.RESTClient(bootstrap, log);
}
}
createBucket(bucketName, bucketMD, log, cb) {
this.client.createBucket(bucketName, log.getSerializedUids(),
bucketMD.serialize(), cb);
return null;
}
getBucketAttributes(bucketName, log, cb) {
this.client.getBucketAttributes(bucketName, log.getSerializedUids(),
(err, data) => {
if (err) {
return cb(err);
}
return cb(err, BucketInfo.deSerialize(data));
});
return null;
}
getBucketAndObject(bucketName, objName, params, log, cb) {
this.client.getBucketAndObject(bucketName, objName,
log.getSerializedUids(), (err, data) => {
if (err && (!err.NoSuchKey && !err.ObjNotFound)) {
return cb(err);
}
return cb(null, JSON.parse(data));
}, params);
return null;
}
getRaftBuckets(raftId, log, cb) {
return this.client.getRaftBuckets(raftId, log.getSerializedUids(),
(err, data) => {
if (err) {
return cb(err);
}
return cb(null, JSON.parse(data));
});
}
putBucketAttributes(bucketName, bucketMD, log, cb) {
this.client.putBucketAttributes(bucketName, log.getSerializedUids(),
bucketMD.serialize(), cb);
return null;
}
deleteBucket(bucketName, log, cb) {
this.client.deleteBucket(bucketName, log.getSerializedUids(), cb);
return null;
}
putObject(bucketName, objName, objVal, params, log, cb) {
this.client.putObject(bucketName, objName, JSON.stringify(objVal),
log.getSerializedUids(), cb, params);
return null;
}
getObject(bucketName, objName, params, log, cb) {
this.client.getObject(bucketName, objName, log.getSerializedUids(),
(err, data) => {
if (err) {
return cb(err);
}
return cb(err, JSON.parse(data));
}, params);
return null;
}
deleteObject(bucketName, objName, params, log, cb) {
this.client.deleteObject(bucketName, objName, log.getSerializedUids(),
cb, params);
return null;
}
listObject(bucketName, params, log, cb) {
this.client.listObject(bucketName, log.getSerializedUids(), params,
(err, data) => {
if (err) {
return cb(err);
}
return cb(err, JSON.parse(data));
});
return null;
}
listMultipartUploads(bucketName, params, log, cb) {
this.client.listObject(bucketName, log.getSerializedUids(), params,
(err, data) => {
if (err) {
return cb(err);
}
return cb(null, JSON.parse(data));
});
return null;
}
_analyzeHealthFailure(log, callback) {
let doFail = false;
const reason = {
msg: 'Map is available and failure ratio is acceptable',
};
// The healthCheck exposed by Bucketd is a light one, we need
// to inspect all the RaftSession's statuses to make sense of
// it:
return this.client.getAllRafts(undefined, (error, payload) => {
let statuses = null;
try {
statuses = JSON.parse(payload);
} catch (e) {
doFail = true;
reason.msg = 'could not interpret status: invalid payload';
// Can't do anything anymore if we fail here. return.
return callback(doFail, reason);
}
const reducer = (acc, payload) => acc + !payload.connectedToLeader;
reason.ratio = statuses.reduce(reducer, 0) / statuses.length;
/* NOTE FIXME/TODO: acceptableRatio could be configured later on */
reason.acceptableRatio = 0.5;
/* If the RaftSession 0 (map) does not work, fail anyways */
if (!doFail && !statuses[0].connectedToLeader) {
doFail = true;
reason.msg = 'Bucket map unavailable';
}
if (!doFail && reason.ratio > reason.acceptableRatio) {
doFail = true;
reason.msg = 'Ratio of failing Raft Sessions is too high';
}
return callback(doFail, reason);
}, log);
}
/*
* Bucketd offers a behavior that diverges from other sub-components on the
* healthCheck: If any of the pieces making up the bucket storage fail (ie:
* if any Raft Session is down), bucketd returns a 500 for the healthCheck.
*
* As seen in S3C-1412, this may become an issue for S3, whenever the
* system is only partly failing.
*
* This means that S3 needs to analyze the situation, and interpret this
* status depending on the analysis. S3 will then assess the situation as
* critical (and consider it a failure), or not (and consider it a success
* anyways, thus not diverging from the healthCheck behavior of other
* components).
*/
checkHealth(implName, log, cb) {
return this.client.healthcheck(log, (err, result) => {
const respBody = {};
if (err) {
return this._analyzeHealthFailure(log, (failure, reason) => {
const message = reason.msg;
// Remove 'msg' from the reason payload.
// eslint-disable-next-line no-param-reassign
reason.msg = undefined;
respBody[implName] = {
code: 200,
message, // Provide interpreted reason msg
body: reason, // Provide analysis data
};
if (failure) {
// Setting the `error` field is how the healthCheck
// logic interprets it as an error. Don't forget it !
respBody[implName].error = err;
respBody[implName].code = err.code; // original error
}
// error returned as null so async parallel doesn't return
// before all backends are checked
return cb(null, respBody);
}, log);
}
const parseResult = JSON.parse(result);
respBody[implName] = {
code: 200,
message: 'OK',
body: parseResult,
};
return cb(null, respBody);
});
}
}
module.exports = BucketClientInterface;

View File

@ -1,384 +0,0 @@
const cluster = require('cluster');
const arsenal = require('arsenal');
const logger = require('../../utilities/logger');
const BucketInfo = arsenal.models.BucketInfo;
const constants = require('../../../constants');
const { config } = require('../../Config');
const errors = arsenal.errors;
const MetadataFileClient = arsenal.storage.metadata.file.MetadataFileClient;
const versionSep = arsenal.versioning.VersioningConstants.VersionId.Separator;
const METASTORE = '__metastore';
class BucketFileInterface {
/**
* @constructor
* @param {object} [params] - constructor params
* @param {boolean} [params.noDbOpen=false] - true to skip DB open
* (for unit tests only)
*/
constructor(params) {
this.logger = logger;
const { host, port } = config.metadataClient;
this.mdClient = new MetadataFileClient({ host, port });
if (params && params.noDbOpen) {
return;
}
this.mdClient.openDB((err, mdDB) => {
if (err) {
throw err;
}
this.mdDB = mdDB;
// the metastore sublevel is used to store bucket attributes
this.metastore = this.mdDB.openSub(METASTORE);
if (cluster.isMaster) {
this.setupMetadataServer();
}
});
}
setupMetadataServer() {
/* Since the bucket creation API is expecting the
usersBucket to have attributes, we pre-create the
usersBucket attributes here */
this.mdClient.logger.debug('setting up metadata server');
const usersBucketAttr = new BucketInfo(constants.usersBucket,
'admin', 'admin', new Date().toJSON(),
BucketInfo.currentModelVersion());
this.metastore.put(
constants.usersBucket,
usersBucketAttr.serialize(), {}, err => {
if (err) {
this.logger.fatal('error writing usersBucket ' +
'attributes to metadata',
{ error: err });
throw (errors.InternalError);
}
});
}
/**
* Load DB if exists
* @param {String} bucketName - name of bucket
* @param {Object} log - logger
* @param {function} cb - callback(err, db, attr)
* @return {undefined}
*/
loadDBIfExists(bucketName, log, cb) {
this.getBucketAttributes(bucketName, log, (err, attr) => {
if (err) {
return cb(err);
}
try {
const db = this.mdDB.openSub(bucketName);
return cb(null, db, attr);
} catch (err) {
return cb(errors.InternalError);
}
});
return undefined;
}
createBucket(bucketName, bucketMD, log, cb) {
this.getBucketAttributes(bucketName, log, err => {
if (err && err !== errors.NoSuchBucket) {
return cb(err);
}
if (err === undefined) {
return cb(errors.BucketAlreadyExists);
}
this.putBucketAttributes(bucketName,
bucketMD,
log, cb);
return undefined;
});
}
getBucketAttributes(bucketName, log, cb) {
this.metastore
.withRequestLogger(log)
.get(bucketName, {}, (err, data) => {
if (err) {
if (err.ObjNotFound) {
return cb(errors.NoSuchBucket);
}
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error getting db attributes', logObj);
return cb(errors.InternalError);
}
return cb(null, BucketInfo.deSerialize(data));
});
return undefined;
}
getBucketAndObject(bucketName, objName, params, log, cb) {
this.loadDBIfExists(bucketName, log, (err, db, bucketAttr) => {
if (err) {
return cb(err);
}
db.withRequestLogger(log)
.get(objName, params, (err, objAttr) => {
if (err) {
if (err.ObjNotFound) {
return cb(null, {
bucket: bucketAttr.serialize(),
});
}
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error getting object', logObj);
return cb(errors.InternalError);
}
return cb(null, {
bucket: bucketAttr.serialize(),
obj: objAttr,
});
});
return undefined;
});
return undefined;
}
putBucketAttributes(bucketName, bucketMD, log, cb) {
this.metastore
.withRequestLogger(log)
.put(bucketName, bucketMD.serialize(), {}, err => {
if (err) {
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error putting db attributes', logObj);
return cb(errors.InternalError);
}
return cb();
});
return undefined;
}
deleteBucket(bucketName, log, cb) {
this.metastore
.withRequestLogger(log)
.del(bucketName, {}, err => {
if (err) {
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error deleting bucket',
logObj);
return cb(errors.InternalError);
}
return cb();
});
return undefined;
}
putObject(bucketName, objName, objVal, params, log, cb) {
this.loadDBIfExists(bucketName, log, (err, db) => {
if (err) {
return cb(err);
}
// Ignore the PUT done by AbortMPU
if (params && params.isAbort) {
return cb();
}
db.withRequestLogger(log)
.put(objName, JSON.stringify(objVal), params, (err, data) => {
if (err) {
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error putting object', logObj);
return cb(errors.InternalError);
}
return cb(err, data);
});
return undefined;
});
}
getObject(bucketName, objName, params, log, cb) {
this.loadDBIfExists(bucketName, log, (err, db) => {
if (err) {
return cb(err);
}
db.withRequestLogger(log).get(objName, params, (err, data) => {
if (err) {
if (err.ObjNotFound) {
return cb(errors.NoSuchKey);
}
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error getting object', logObj);
return cb(errors.InternalError);
}
return cb(null, JSON.parse(data));
});
return undefined;
});
}
deleteObject(bucketName, objName, params, log, cb) {
this.loadDBIfExists(bucketName, log, (err, db) => {
if (err) {
return cb(err);
}
db.withRequestLogger(log).del(objName, params, err => {
if (err) {
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error deleting object', logObj);
return cb(errors.InternalError);
}
return cb();
});
return undefined;
});
}
/**
* This complex function deals with different extensions of bucket listing:
* Delimiter based search or MPU based search.
* @param {String} bucketName - The name of the bucket to list
* @param {Object} params - The params to search
* @param {Object} log - The logger object
* @param {function} cb - Callback when done
* @return {undefined}
*/
internalListObject(bucketName, params, log, cb) {
const extName = params.listingType;
const extension = new arsenal.algorithms.list[extName](params, log);
const requestParams = extension.genMDParams();
this.loadDBIfExists(bucketName, log, (err, db) => {
if (err) {
return cb(err);
}
let cbDone = false;
db.withRequestLogger(log)
.createReadStream(requestParams, (err, stream) => {
if (err) {
return cb(err);
}
stream
.on('data', e => {
if (extension.filter(e) < 0) {
stream.emit('end');
stream.destroy();
}
})
.on('error', err => {
if (!cbDone) {
cbDone = true;
const logObj = {
rawError: err,
error: err.message,
errorStack: err.stack,
};
log.error('error listing objects', logObj);
cb(errors.InternalError);
}
})
.on('end', () => {
if (!cbDone) {
cbDone = true;
const data = extension.result();
cb(null, data);
}
});
return undefined;
});
return undefined;
});
}
listObject(bucketName, params, log, cb) {
return this.internalListObject(bucketName, params, log, cb);
}
listMultipartUploads(bucketName, params, log, cb) {
return this.internalListObject(bucketName, params, log, cb);
}
getUUID(log, cb) {
return this.mdDB.getUUID(cb);
}
getDiskUsage(cb) {
return this.mdDB.getDiskUsage(cb);
}
countItems(log, cb) {
const params = {};
const extension = new arsenal.algorithms.list.Basic(params, log);
const requestParams = extension.genMDParams();
const res = {
objects: 0,
versions: 0,
buckets: 0,
};
let cbDone = false;
this.mdDB.rawListKeys(requestParams, (err, stream) => {
if (err) {
return cb(err);
}
stream
.on('data', e => {
if (!e.includes(METASTORE)) {
if (e.includes(constants.usersBucket)) {
res.buckets++;
} else if (e.includes(versionSep)) {
res.versions++;
} else {
res.objects++;
}
}
})
.on('error', err => {
if (!cbDone) {
cbDone = true;
const logObj = {
error: err,
errorMessage: err.message,
errorStack: err.stack,
};
log.error('error listing objects', logObj);
cb(errors.InternalError);
}
})
.on('end', () => {
if (!cbDone) {
cbDone = true;
return cb(null, res);
}
return undefined;
});
return undefined;
});
return undefined;
}
}
module.exports = BucketFileInterface;

View File

@ -1,34 +0,0 @@
const ListResult = require('./ListResult');
class ListMultipartUploadsResult extends ListResult {
constructor() {
super();
this.Uploads = [];
this.NextKeyMarker = undefined;
this.NextUploadIdMarker = undefined;
}
addUpload(uploadInfo) {
this.Uploads.push({
key: decodeURIComponent(uploadInfo.key),
value: {
UploadId: uploadInfo.uploadId,
Initiator: {
ID: uploadInfo.initiatorID,
DisplayName: uploadInfo.initiatorDisplayName,
},
Owner: {
ID: uploadInfo.ownerID,
DisplayName: uploadInfo.ownerDisplayName,
},
StorageClass: uploadInfo.storageClass,
Initiated: uploadInfo.initiated,
},
});
this.MaxKeys += 1;
}
}
module.exports = {
ListMultipartUploadsResult,
};

View File

@ -1,27 +0,0 @@
class ListResult {
constructor() {
this.IsTruncated = false;
this.NextMarker = undefined;
this.CommonPrefixes = [];
/*
Note: this.MaxKeys will get incremented as
keys are added so that when response is returned,
this.MaxKeys will equal total keys in response
(with each CommonPrefix counting as 1 key)
*/
this.MaxKeys = 0;
}
addCommonPrefix(prefix) {
if (!this.hasCommonPrefix(prefix)) {
this.CommonPrefixes.push(prefix);
this.MaxKeys += 1;
}
}
hasCommonPrefix(prefix) {
return (this.CommonPrefixes.indexOf(prefix) !== -1);
}
}
module.exports = ListResult;

View File

@ -1,335 +0,0 @@
const { errors, algorithms, versioning } = require('arsenal');
const getMultipartUploadListing = require('./getMultipartUploadListing');
const { metadata } = require('./metadata');
const { config } = require('../../Config');
const genVID = versioning.VersionID.generateVersionId;
const defaultMaxKeys = 1000;
let uidCounter = 0;
function generateVersionId() {
return genVID(uidCounter++, config.replicationGroupId);
}
function formatVersionKey(key, versionId) {
return `${key}\0${versionId}`;
}
function inc(str) {
return str ? (str.slice(0, str.length - 1) +
String.fromCharCode(str.charCodeAt(str.length - 1) + 1)) : str;
}
const metastore = {
createBucket: (bucketName, bucketMD, log, cb) => {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, (err, bucket) => {
// TODO Check whether user already owns the bucket,
// if so return "BucketAlreadyOwnedByYou"
// If not owned by user, return "BucketAlreadyExists"
if (bucket) {
return cb(errors.BucketAlreadyExists);
}
metadata.buckets.set(bucketName, bucketMD);
metadata.keyMaps.set(bucketName, new Map);
return cb();
});
});
},
putBucketAttributes: (bucketName, bucketMD, log, cb) => {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
metadata.buckets.set(bucketName, bucketMD);
return cb();
});
});
},
getBucketAttributes: (bucketName, log, cb) => {
process.nextTick(() => {
if (!metadata.buckets.has(bucketName)) {
return cb(errors.NoSuchBucket);
}
return cb(null, metadata.buckets.get(bucketName));
});
},
deleteBucket: (bucketName, log, cb) => {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
if (metadata.keyMaps.has(bucketName)
&& metadata.keyMaps.get(bucketName).length > 0) {
return cb(errors.BucketNotEmpty);
}
metadata.buckets.delete(bucketName);
metadata.keyMaps.delete(bucketName);
return cb(null);
});
});
},
putObject: (bucketName, objName, objVal, params, log, cb) => {
process.nextTick(() => {
// Ignore the PUT done by AbortMPU
if (params && params.isAbort) {
return cb(null);
}
return metastore.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
/*
valid combinations of versioning options:
- !versioning && !versionId: normal non-versioning put
- versioning && !versionId: create a new version
- versionId: update (PUT/DELETE) an existing version,
and also update master version in case the put
version is newer or same version than master.
if versionId === '' update master version
*/
if (params && params.versionId) {
objVal.versionId = params.versionId; // eslint-disable-line
const mst = metadata.keyMaps.get(bucketName).get(objName);
if (mst && mst.versionId === params.versionId || !mst) {
metadata.keyMaps.get(bucketName).set(objName, objVal);
}
// eslint-disable-next-line
objName = formatVersionKey(objName, params.versionId);
metadata.keyMaps.get(bucketName).set(objName, objVal);
return cb(null, `{"versionId":"${objVal.versionId}"}`);
}
if (params && params.versioning) {
const versionId = generateVersionId();
objVal.versionId = versionId; // eslint-disable-line
metadata.keyMaps.get(bucketName).set(objName, objVal);
// eslint-disable-next-line
objName = formatVersionKey(objName, versionId);
metadata.keyMaps.get(bucketName).set(objName, objVal);
return cb(null, `{"versionId":"${versionId}"}`);
}
if (params && params.versionId === '') {
const versionId = generateVersionId();
objVal.versionId = versionId; // eslint-disable-line
metadata.keyMaps.get(bucketName).set(objName, objVal);
return cb(null, `{"versionId":"${objVal.versionId}"}`);
}
metadata.keyMaps.get(bucketName).set(objName, objVal);
return cb(null);
});
});
},
getBucketAndObject: (bucketName, objName, params, log, cb) => {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, (err, bucket) => {
if (err) {
return cb(err, { bucket });
}
if (params && params.versionId) {
// eslint-disable-next-line
objName = formatVersionKey(objName, params.versionId);
}
if (!metadata.keyMaps.has(bucketName)
|| !metadata.keyMaps.get(bucketName).has(objName)) {
return cb(null, { bucket: bucket.serialize() });
}
return cb(null, {
bucket: bucket.serialize(),
obj: JSON.stringify(
metadata.keyMaps.get(bucketName).get(objName)
),
});
});
});
},
getObject: (bucketName, objName, params, log, cb) => {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
if (params && params.versionId) {
// eslint-disable-next-line
objName = formatVersionKey(objName, params.versionId);
}
if (!metadata.keyMaps.has(bucketName)
|| !metadata.keyMaps.get(bucketName).has(objName)) {
return cb(errors.NoSuchKey);
}
return cb(null, metadata.keyMaps.get(bucketName).get(objName));
});
});
},
deleteObject: (bucketName, objName, params, log, cb) => {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, err => {
if (err) {
return cb(err);
}
if (!metadata.keyMaps.get(bucketName).has(objName)) {
return cb(errors.NoSuchKey);
}
if (params && params.versionId) {
const baseKey = inc(formatVersionKey(objName, ''));
const vobjName = formatVersionKey(objName,
params.versionId);
metadata.keyMaps.get(bucketName).delete(vobjName);
const mst = metadata.keyMaps.get(bucketName).get(objName);
if (mst.versionId === params.versionId) {
const keys = [];
metadata.keyMaps.get(bucketName).forEach((val, key) => {
if (key < baseKey && key > vobjName) {
keys.push(key);
}
});
if (keys.length === 0) {
metadata.keyMaps.get(bucketName).delete(objName);
return cb();
}
const key = keys.sort()[0];
const value = metadata.keyMaps.get(bucketName).get(key);
metadata.keyMaps.get(bucketName).set(objName, value);
}
return cb();
}
metadata.keyMaps.get(bucketName).delete(objName);
return cb();
});
});
},
_hasDeleteMarker(key, keyMap) {
const objectMD = keyMap.get(key);
if (objectMD['x-amz-delete-marker'] !== undefined) {
return (objectMD['x-amz-delete-marker'] === true);
}
return false;
},
listObject(bucketName, params, log, cb) {
process.nextTick(() => {
const {
prefix,
marker,
delimiter,
maxKeys,
continuationToken,
startAfter,
} = params;
if (prefix && typeof prefix !== 'string') {
return cb(errors.InvalidArgument);
}
if (marker && typeof marker !== 'string') {
return cb(errors.InvalidArgument);
}
if (delimiter && typeof delimiter !== 'string') {
return cb(errors.InvalidArgument);
}
if (maxKeys && typeof maxKeys !== 'number') {
return cb(errors.InvalidArgument);
}
if (continuationToken && typeof continuationToken !== 'string') {
return cb(errors.InvalidArgument);
}
if (startAfter && typeof startAfter !== 'string') {
return cb(errors.InvalidArgument);
}
// If paramMaxKeys is undefined, the default parameter will set it.
// However, if it is null, the default parameter will not set it.
let numKeys = maxKeys;
if (numKeys === null) {
numKeys = defaultMaxKeys;
}
if (!metadata.keyMaps.has(bucketName)) {
return cb(errors.NoSuchBucket);
}
// If marker specified, edit the keys array so it
// only contains keys that occur alphabetically after the marker
const listingType = params.listingType;
const extension = new algorithms.list[listingType](params, log);
const listingParams = extension.genMDParams();
const keys = [];
metadata.keyMaps.get(bucketName).forEach((val, key) => {
if (listingParams.gt && listingParams.gt >= key) {
return null;
}
if (listingParams.gte && listingParams.gte > key) {
return null;
}
if (listingParams.lt && key >= listingParams.lt) {
return null;
}
if (listingParams.lte && key > listingParams.lte) {
return null;
}
return keys.push(key);
});
keys.sort();
// Iterate through keys array and filter keys containing
// delimiter into response.CommonPrefixes and filter remaining
// keys into response.Contents
for (let i = 0; i < keys.length; ++i) {
const currentKey = keys[i];
// Do not list object with delete markers
if (this._hasDeleteMarker(currentKey,
metadata.keyMaps.get(bucketName))) {
continue;
}
const objMD = metadata.keyMaps.get(bucketName).get(currentKey);
const value = JSON.stringify(objMD);
const obj = {
key: currentKey,
value,
};
// calling Ext.filter(obj) adds the obj to the Ext result if
// not filtered.
// Also, Ext.filter returns false when hit max keys.
// What a nifty function!
if (extension.filter(obj) < 0) {
break;
}
}
return cb(null, extension.result());
});
},
listMultipartUploads(bucketName, listingParams, log, cb) {
process.nextTick(() => {
metastore.getBucketAttributes(bucketName, log, (err, bucket) => {
if (bucket === undefined) {
// no on going multipart uploads, return empty listing
return cb(null, {
IsTruncated: false,
NextMarker: undefined,
MaxKeys: 0,
});
}
return getMultipartUploadListing(bucket, listingParams, cb);
});
});
},
};
module.exports = metastore;

View File

@ -1,62 +0,0 @@
# bucket_mem design
## RATIONALE
The bucket API will be used for managing buckets behind the S3 interface.
We plan to have only 2 backends using this interface:
* One production backend
* One debug backend purely in memory
One important remark here is that we don't want an abstraction but a
duck-typing style interface (different classes MemoryBucket and Bucket having
the same methods putObjectMD(), getObjectMD(), etc).
Notes about the memory backend: The backend is currently a simple key/value
store in memory. The functions actually use nextTick() to emulate the future
asynchronous behavior of the production backend.
## BUCKET API
The bucket API is a very simple API with 5 functions:
- putObjectMD(): put metadata for an object in the bucket
- getObjectMD(): get metadata from the bucket
- deleteObjectMD(): delete metadata for an object from the bucket
- deleteBucketMD(): delete a bucket
- getBucketListObjects(): perform the complex bucket listing AWS search
function with various flavors. This function returns a response in a
ListBucketResult object.
getBucketListObjects(prefix, marker, delimiter, maxKeys, callback) behavior is
the following:
prefix (not required): Limits the response to keys that begin with the
specified prefix. You can use prefixes to separate a bucket into different
groupings of keys. (You can think of using prefix to make groups in the same
way you'd use a folder in a file system.)
marker (not required): Specifies the key to start with when listing objects in
a bucket. Amazon S3 returns object keys in alphabetical order, starting with
key after the marker in order.
delimiter (not required): A delimiter is a character you use to group keys.
All keys that contain the same string between the prefix, if specified, and the
first occurrence of the delimiter after the prefix are grouped under a single
result element, CommonPrefixes. If you don't specify the prefix parameter, then
the substring starts at the beginning of the key. The keys that are grouped
under CommonPrefixes are not returned elsewhere in the response.
maxKeys: Sets the maximum number of keys returned in the response body. You can
add this to your request if you want to retrieve fewer than the default 1000
keys. The response might contain fewer keys but will never contain more. If
there are additional keys that satisfy the search criteria but were not
returned because maxKeys was exceeded, the response contains an attribute of
IsTruncated set to true and a NextMarker. To return the additional keys, call
the function again using NextMarker as your marker argument in the function.
Any key that does not contain the delimiter will be returned individually in
Contents rather than in CommonPrefixes.
If there is an error, the error subfield is returned in the response.

View File

@ -1,51 +0,0 @@
function markerFilterMPU(allMarkers, array) {
const { keyMarker, uploadIdMarker } = allMarkers;
for (let i = 0; i < array.length; i++) {
// If the keyMarker is the same as the key,
// check the uploadIdMarker. If uploadIdMarker is the same
// as or alphabetically after the uploadId of the item,
// eliminate the item.
if (uploadIdMarker && keyMarker === array[i].key) {
const laterId =
[uploadIdMarker, array[i].uploadId].sort()[1];
if (array[i].uploadId === laterId) {
break;
} else {
array.shift();
i--;
}
} else {
// If the keyMarker is alphabetically after the key
// of the item in the array, eliminate the item from the array.
const laterItem =
[keyMarker, array[i].key].sort()[1];
if (keyMarker === array[i].key || keyMarker === laterItem) {
array.shift();
i--;
} else {
break;
}
}
}
return array;
}
function prefixFilter(prefix, array) {
for (let i = 0; i < array.length; i++) {
if (array[i].indexOf(prefix) !== 0) {
array.splice(i, 1);
i--;
}
}
return array;
}
function isKeyInContents(responseObject, key) {
return responseObject.Contents.some(val => val.key === key);
}
module.exports = {
markerFilterMPU,
prefixFilter,
isKeyInContents,
};

View File

@ -1,148 +0,0 @@
const { errors } = require('arsenal');
const { markerFilterMPU, prefixFilter } = require('./bucket_utilities');
const { ListMultipartUploadsResult } = require('./ListMultipartUploadsResult');
const { metadata } = require('./metadata');
const defaultMaxKeys = 1000;
function getMultipartUploadListing(bucket, params, callback) {
const { delimiter, keyMarker,
uploadIdMarker, prefix, queryPrefixLength, splitter } = params;
const splitterLen = splitter.length;
const maxKeys = params.maxKeys !== undefined ?
Number.parseInt(params.maxKeys, 10) : defaultMaxKeys;
const response = new ListMultipartUploadsResult();
const keyMap = metadata.keyMaps.get(bucket.getName());
if (prefix) {
response.Prefix = prefix;
if (typeof prefix !== 'string') {
return callback(errors.InvalidArgument);
}
}
if (keyMarker) {
response.KeyMarker = keyMarker;
if (typeof keyMarker !== 'string') {
return callback(errors.InvalidArgument);
}
}
if (uploadIdMarker) {
response.UploadIdMarker = uploadIdMarker;
if (typeof uploadIdMarker !== 'string') {
return callback(errors.InvalidArgument);
}
}
if (delimiter) {
response.Delimiter = delimiter;
if (typeof delimiter !== 'string') {
return callback(errors.InvalidArgument);
}
}
if (maxKeys && typeof maxKeys !== 'number') {
return callback(errors.InvalidArgument);
}
// Sort uploads alphatebetically by objectKey and if same objectKey,
// then sort in ascending order by time initiated
let uploads = [];
keyMap.forEach((val, key) => {
uploads.push(key);
});
uploads.sort((a, b) => {
const aIndex = a.indexOf(splitter);
const bIndex = b.indexOf(splitter);
const aObjectKey = a.substring(aIndex + splitterLen);
const bObjectKey = b.substring(bIndex + splitterLen);
const aInitiated = keyMap.get(a).initiated;
const bInitiated = keyMap.get(b).initiated;
if (aObjectKey === bObjectKey) {
if (Date.parse(aInitiated) >= Date.parse(bInitiated)) {
return 1;
}
if (Date.parse(aInitiated) < Date.parse(bInitiated)) {
return -1;
}
}
return (aObjectKey < bObjectKey) ? -1 : 1;
});
// Edit the uploads array so it only
// contains keys that contain the prefix
uploads = prefixFilter(prefix, uploads);
uploads = uploads.map(stringKey => {
const index = stringKey.indexOf(splitter);
const index2 = stringKey.indexOf(splitter, index + splitterLen);
const storedMD = keyMap.get(stringKey);
return {
key: stringKey.substring(index + splitterLen, index2),
uploadId: stringKey.substring(index2 + splitterLen),
bucket: storedMD.eventualStorageBucket,
initiatorID: storedMD.initiator.ID,
initiatorDisplayName: storedMD.initiator.DisplayName,
ownerID: storedMD['owner-id'],
ownerDisplayName: storedMD['owner-display-name'],
storageClass: storedMD['x-amz-storage-class'],
initiated: storedMD.initiated,
};
});
// If keyMarker specified, edit the uploads array so it
// only contains keys that occur alphabetically after the marker.
// If there is also an uploadIdMarker specified, filter to eliminate
// any uploads that share the keyMarker and have an uploadId before
// the uploadIdMarker.
if (keyMarker) {
const allMarkers = {
keyMarker,
uploadIdMarker,
};
uploads = markerFilterMPU(allMarkers, uploads);
}
// Iterate through uploads and filter uploads
// with keys containing delimiter
// into response.CommonPrefixes and filter remaining uploads
// into response.Uploads
for (let i = 0; i < uploads.length; i++) {
const currentUpload = uploads[i];
// If hit maxKeys, stop adding keys to response
if (response.MaxKeys >= maxKeys) {
response.IsTruncated = true;
break;
}
// If a delimiter is specified, find its
// index in the current key AFTER THE OCCURRENCE OF THE PREFIX
// THAT WAS SENT IN THE QUERY (not the prefix including the splitter
// and other elements)
let delimiterIndexAfterPrefix = -1;
const currentKeyWithoutPrefix =
currentUpload.key.slice(queryPrefixLength);
let sliceEnd;
if (delimiter) {
delimiterIndexAfterPrefix = currentKeyWithoutPrefix
.indexOf(delimiter);
sliceEnd = delimiterIndexAfterPrefix + queryPrefixLength;
}
// If delimiter occurs in current key, add key to
// response.CommonPrefixes.
// Otherwise add upload to response.Uploads
if (delimiterIndexAfterPrefix > -1) {
const keySubstring = currentUpload.key.slice(0, sliceEnd + 1);
response.addCommonPrefix(keySubstring);
} else {
response.NextKeyMarker = currentUpload.key;
response.NextUploadIdMarker = currentUpload.uploadId;
response.addUpload(currentUpload);
}
}
// `response.MaxKeys` should be the value from the original `MaxUploads`
// parameter specified by the user (or else the default 1000). Redefine it
// here, so it does not equal the value of `uploads.length`.
response.MaxKeys = maxKeys;
// If `response.MaxKeys` is 0, `response.IsTruncated` should be `false`.
response.IsTruncated = maxKeys === 0 ? false : response.IsTruncated;
return callback(null, response);
}
module.exports = getMultipartUploadListing;

View File

@ -1,8 +0,0 @@
const metadata = {
buckets: new Map,
keyMaps: new Map,
};
module.exports = {
metadata,
};

View File

@ -7,59 +7,6 @@ const { isBucketAuthorized, isObjAuthorized } =
require('../api/apiUtils/authorization/permissionChecks');
const bucketShield = require('../api/apiUtils/bucket/bucketShield');
/** _parseListEntries - parse the values returned in a listing by metadata
* @param {object[]} entries - Version or Content entries in a metadata listing
* @param {string} entries[].key - metadata key
* @param {string} entries[].value - stringified object metadata
* @return {object} - mapped array with parsed value or JSON parsing err
*/
function _parseListEntries(entries) {
return entries.map(entry => {
if (typeof entry.value === 'string') {
const tmp = JSON.parse(entry.value);
return {
key: entry.key,
value: {
Size: tmp['content-length'],
ETag: tmp['content-md5'],
VersionId: tmp.versionId,
IsNull: tmp.isNull,
IsDeleteMarker: tmp.isDeleteMarker,
LastModified: tmp['last-modified'],
Owner: {
DisplayName: tmp['owner-display-name'],
ID: tmp['owner-id'],
},
StorageClass: tmp['x-amz-storage-class'],
// MPU listing properties
Initiated: tmp.initiated,
Initiator: tmp.initiator,
EventualStorageBucket: tmp.eventualStorageBucket,
partLocations: tmp.partLocations,
creationDate: tmp.creationDate,
},
};
}
return entry;
});
}
/** parseListEntries - parse the values returned in a listing by metadata
* @param {object[]} entries - Version or Content entries in a metadata listing
* @param {string} entries[].key - metadata key
* @param {string} entries[].value - stringified object metadata
* @return {(object|Error)} - mapped array with parsed value or JSON parsing err
*/
function parseListEntries(entries) {
// wrap private function in a try/catch clause
// just in case JSON parsing throws an exception
try {
return _parseListEntries(entries);
} catch (e) {
return e;
}
}
/** getNullVersion - return metadata of null version if it exists
* @param {object} objMD - metadata of master version
* @param {string} bucketName - name of bucket
@ -279,7 +226,6 @@ function metadataValidateBucket(params, log, callback) {
}
module.exports = {
parseListEntries,
metadataGetObject,
metadataValidateBucketAndObj,
metadataValidateBucket,

View File

@ -1,252 +1,43 @@
const errors = require('arsenal').errors;
const BucketClientInterface = require('./bucketclient/backend');
const BucketFileInterface = require('./bucketfile/backend');
const BucketInfo = require('arsenal').models.BucketInfo;
const inMemory = require('./in_memory/backend');
const MetadataWrapper = require('arsenal').storage.metadata.MetadataWrapper;
const { config } = require('../Config');
const logger = require('../utilities/logger');
const constants = require('../../constants');
const bucketclient = require('bucketclient');
let CdmiMetadata;
try {
CdmiMetadata = require('cdmiclient').CdmiMetadata;
} catch (err) {
CdmiMetadata = null;
const clientName = config.backends.metadata;
let params;
if (clientName === 'mem') {
params = {};
} else if (clientName === 'file') {
params = {
metadataClient: {
host: config.metadataClient.host,
port: config.metadataClient.port,
},
constants: {
usersBucket: constants.usersBucket,
splitter: constants.splitter,
},
noDbOpen: null,
};
} else if (clientName === 'scality') {
params = {
bucketdBootstrap: config.bucketd.bootstrap,
bucketdLog: config.bucketd.log,
https: config.https,
};
} else if (clientName === 'mongodb') {
params = {
mongodb: config.mongodb,
replicationGroupId: config.replicationGroupId,
config,
};
} else if (clientName === 'cdmi') {
params = {
cdmi: config.cdmi,
};
}
let client;
let implName;
if (config.backends.metadata === 'mem') {
client = inMemory;
implName = 'memorybucket';
} else if (config.backends.metadata === 'file') {
client = new BucketFileInterface();
implName = 'bucketfile';
} else if (config.backends.metadata === 'scality') {
client = new BucketClientInterface();
implName = 'bucketclient';
} else if (config.backends.metadata === 'cdmi') {
if (!CdmiMetadata) {
throw new Error('Unauthorized backend');
}
client = new CdmiMetadata({
path: config.cdmi.path,
host: config.cdmi.host,
port: config.cdmi.port,
readonly: config.cdmi.readonly,
});
implName = 'cdmi';
}
const metadata = {
createBucket: (bucketName, bucketMD, log, cb) => {
log.debug('creating bucket in metadata');
client.createBucket(bucketName, bucketMD, log, err => {
if (err) {
log.debug('error from metadata', { implName, error: err });
return cb(err);
}
log.trace('bucket created in metadata');
return cb(err);
});
},
updateBucket: (bucketName, bucketMD, log, cb) => {
log.debug('updating bucket in metadata');
client.putBucketAttributes(bucketName, bucketMD, log, err => {
if (err) {
log.debug('error from metadata', { implName, error: err });
return cb(err);
}
log.trace('bucket updated in metadata');
return cb(err);
});
},
getBucket: (bucketName, log, cb) => {
log.debug('getting bucket from metadata');
client.getBucketAttributes(bucketName, log, (err, data) => {
if (err) {
log.debug('error from metadata', { implName, error: err });
return cb(err);
}
log.trace('bucket retrieved from metadata');
return cb(err, BucketInfo.fromObj(data));
});
},
deleteBucket: (bucketName, log, cb) => {
log.debug('deleting bucket from metadata');
client.deleteBucket(bucketName, log, err => {
if (err) {
log.debug('error from metadata', { implName, error: err });
return cb(err);
}
log.debug('Deleted bucket from Metadata');
return cb(err);
});
},
putObjectMD: (bucketName, objName, objVal, params, log, cb) => {
log.debug('putting object in metadata');
const value = typeof objVal.getValue === 'function' ?
objVal.getValue() : objVal;
client.putObject(bucketName, objName, value, params, log,
(err, data) => {
if (err) {
log.debug('error from metadata', { implName, error: err });
return cb(err);
}
if (data) {
log.debug('object version successfully put in metadata',
{ version: data });
} else {
log.debug('object successfully put in metadata');
}
return cb(err, data);
});
},
getBucketAndObjectMD: (bucketName, objName, params, log, cb) => {
log.debug('getting bucket and object from metadata',
{ database: bucketName, object: objName });
client.getBucketAndObject(bucketName, objName, params, log,
(err, data) => {
if (err) {
log.debug('error from metadata', { implName, err });
return cb(err);
}
log.debug('bucket and object retrieved from metadata',
{ database: bucketName, object: objName });
return cb(err, data);
});
},
getObjectMD: (bucketName, objName, params, log, cb) => {
log.debug('getting object from metadata');
client.getObject(bucketName, objName, params, log, (err, data) => {
if (err) {
log.debug('error from metadata', { implName, err });
return cb(err);
}
log.debug('object retrieved from metadata');
return cb(err, data);
});
},
deleteObjectMD: (bucketName, objName, params, log, cb) => {
log.debug('deleting object from metadata');
client.deleteObject(bucketName, objName, params, log, err => {
if (err) {
log.debug('error from metadata', { implName, err });
return cb(err);
}
log.debug('object deleted from metadata');
return cb(err);
});
},
listObject: (bucketName, listingParams, log, cb) => {
const metadataUtils = require('./metadataUtils');
if (listingParams.listingType === undefined) {
// eslint-disable-next-line
listingParams.listingType = 'Delimiter';
}
client.listObject(bucketName, listingParams, log, (err, data) => {
log.debug('getting object listing from metadata');
if (err) {
log.debug('error from metadata', { implName, err });
return cb(err);
}
log.debug('object listing retrieved from metadata');
if (listingParams.listingType === 'DelimiterVersions') {
// eslint-disable-next-line
data.Versions = metadataUtils.parseListEntries(data.Versions);
if (data.Versions instanceof Error) {
log.error('error parsing metadata listing', {
error: data.Versions,
listingType: listingParams.listingType,
method: 'listObject',
});
return cb(errors.InternalError);
}
return cb(null, data);
}
// eslint-disable-next-line
data.Contents = metadataUtils.parseListEntries(data.Contents);
if (data.Contents instanceof Error) {
log.error('error parsing metadata listing', {
error: data.Contents,
listingType: listingParams.listingType,
method: 'listObject',
});
return cb(errors.InternalError);
}
return cb(null, data);
});
},
listMultipartUploads: (bucketName, listingParams, log, cb) => {
client.listMultipartUploads(bucketName, listingParams, log,
(err, data) => {
log.debug('getting mpu listing from metadata');
if (err) {
log.debug('error from metadata', { implName, err });
return cb(err);
}
log.debug('mpu listing retrieved from metadata');
return cb(err, data);
});
},
switch: newClient => {
client = newClient;
},
getRaftBuckets: (raftId, log, cb) => {
if (!client.getRaftBuckets) {
return cb();
}
return client.getRaftBuckets(raftId, log, cb);
},
checkHealth: (log, cb) => {
if (!client.checkHealth) {
const defResp = {};
defResp[implName] = { code: 200, message: 'OK' };
return cb(null, defResp);
}
return client.checkHealth(implName, log, cb);
},
getUUID: (log, cb) => {
if (!client.getUUID) {
log.debug('returning empty uuid as fallback', { implName });
return cb(null, '');
}
return client.getUUID(log, cb);
},
getDiskUsage: (log, cb) => {
if (!client.getDiskUsage) {
log.debug('returning empty disk usage as fallback', { implName });
return cb(null, {});
}
return client.getDiskUsage(cb);
},
countItems: (log, cb) => {
if (!client.countItems) {
log.debug('returning zero item counts as fallback', { implName });
return cb(null, {
buckets: 0,
objects: 0,
versions: 0,
});
}
return client.countItems(log, cb);
},
};
const metadata = new MetadataWrapper(config.backends.metadata, params,
bucketclient, logger);
module.exports = metadata;

View File

@ -1,14 +1,24 @@
const url = require('url');
const async = require('async');
const httpProxy = require('http-proxy');
const querystring = require('querystring');
const { auth, errors, s3middleware } = require('arsenal');
const { responseJSONBody } = require('arsenal').s3routes.routesUtils;
const backbeatProxy = httpProxy.createProxyServer({
ignorePath: true,
});
const { auth, errors, s3middleware, s3routes, models, storage } =
require('arsenal');
const { responseJSONBody } = s3routes.routesUtils;
const { getSubPartIds } = s3middleware.azureHelper.mpuUtils;
const { skipMpuPartProcessing } = storage.data.external.backendUtils;
const { parseLC, MultipleBackendGateway } = storage.data;
const vault = require('../auth/vault');
const data = require('../data/wrapper');
const dataWrapper = require('../data/wrapper');
const metadata = require('../metadata/wrapper');
const locationConstraintCheck = require(
'../api/apiUtils/object/locationConstraintCheck');
const locationStorageCheck =
require('../api/apiUtils/object/locationStorageCheck');
const { dataStore } = require('../api/apiUtils/object/storeObject');
const prepareRequestContexts = require(
'../api/apiUtils/authorization/prepareRequestContexts');
@ -17,10 +27,9 @@ const locationKeysHaveChanged
= require('../api/apiUtils/object/locationKeysHaveChanged');
const { metadataValidateBucketAndObj,
metadataGetObject } = require('../metadata/metadataUtils');
const { BackendInfo } = require('../api/apiUtils/object/BackendInfo');
const { locationConstraints } = require('../Config').config;
const multipleBackendGateway = require('../data/multipleBackendGateway');
const { config } = require('../Config');
const constants = require('../../constants');
const { BackendInfo } = models;
const { pushReplicationMetric } = require('./utilities/pushReplicationMetric');
const kms = require('../kms/wrapper');
@ -29,6 +38,18 @@ auth.setHandler(vault);
const NAMESPACE = 'default';
const CIPHER = null; // replication/lifecycle does not work on encrypted objects
let { locationConstraints } = config;
const { implName } = dataWrapper;
let dataClient = dataWrapper.client;
config.on('location-constraints-update', () => {
locationConstraints = config.locationConstraints;
if (implName === 'multipleBackends') {
const clients = parseLC(config, vault);
dataClient = new MultipleBackendGateway(
clients, metadata, locationStorageCheck);
}
});
function _decodeURI(uri) {
// do the same decoding than in S3 server
return decodeURIComponent(uri.replace(/\+/g, ' '));
@ -47,13 +68,21 @@ function _normalizeBackbeatRequest(req) {
}
function _isObjectRequest(req) {
return ['data', 'metadata', 'multiplebackenddata']
.includes(req.resourceType);
return [
'data',
'metadata',
'multiplebackenddata',
'multiplebackendmetadata',
].includes(req.resourceType);
}
function _respondWithHeaders(response, payload, extraHeaders, log, callback) {
const body = typeof payload === 'object' ?
JSON.stringify(payload) : payload;
let body = '';
if (typeof payload === 'string') {
body = payload;
} else if (typeof payload === 'object') {
body = JSON.stringify(payload);
}
const httpHeaders = Object.assign({
'x-amz-id-2': log.getSerializedUids(),
'x-amz-request-id': log.getSerializedUids(),
@ -94,20 +123,15 @@ function _checkMultipleBackendRequest(request, log) {
log.error(errMessage);
return errors.BadRequest.customizeDescription(errMessage);
}
if ((operation === 'initiatempu' || operation === 'putobject') &&
headers['x-scal-version-id'] === undefined) {
errMessage = 'bad request: missing x-scal-version-id header';
log.error(errMessage);
return errors.BadRequest.customizeDescription(errMessage);
}
if (operation === 'putpart' &&
headers['x-scal-part-number'] === undefined) {
errMessage = 'bad request: missing part-number header';
log.error(errMessage);
return errors.BadRequest.customizeDescription(errMessage);
}
if ((operation === 'putpart' || operation === 'completempu') &&
headers['x-scal-upload-id'] === undefined) {
const isMPUOperation =
['putpart', 'completempu', 'abortmpu'].includes(operation);
if (isMPUOperation && headers['x-scal-upload-id'] === undefined) {
errMessage = 'bad request: missing upload-id header';
log.error(errMessage);
return errors.BadRequest.customizeDescription(errMessage);
@ -132,11 +156,6 @@ function _checkMultipleBackendRequest(request, log) {
log.error(errMessage);
return errors.BadRequest.customizeDescription(errMessage);
}
if (headers['x-scal-source-version-id'] === undefined) {
errMessage = 'bad request: missing x-scal-source-version-id header';
log.error(errMessage);
return errors.BadRequest.customizeDescription(errMessage);
}
if (headers['x-scal-replication-endpoint-site'] === undefined) {
errMessage = 'bad request: missing ' +
'x-scal-replication-endpoint-site';
@ -156,7 +175,9 @@ function _checkMultipleBackendRequest(request, log) {
return errors.BadRequest.customizeDescription(errMessage);
}
const location = locationConstraints[headers['x-scal-storage-class']];
const isValidLocation = location && storageType.includes(location.type);
const storageTypeList = storageType.split(',');
const isValidLocation = location &&
storageTypeList.includes(location.type);
if (!isValidLocation) {
errMessage = 'invalid request: invalid location constraint in request';
log.debug(errMessage, {
@ -187,9 +208,64 @@ function getPartList(parts, objectKey, uploadId, storageLocation) {
} else {
partList.Part = parts;
}
return partList;
}
function generateMpuAggregateInfo(parts) {
let aggregateSize;
// CopyLocationTask does transmit a size for each part,
// MultipleBackendTask does not, so check if size is defined in
// the first part.
if (parts[0] && parts[0].Size) {
aggregateSize = parts.reduce(
(agg, part) => agg + Number.parseInt(part.Size[0], 10), 0);
}
return {
aggregateSize,
aggregateETag: s3middleware.processMpuParts.createAggregateETag(
parts.map(part => part.ETag[0])),
};
}
/**
* Helper to create the response object for putObject and completeMPU
*
* @param {object} params - response info
* @param {string} params.dataStoreName - name of location
* @param {string} params.dataStoreType - location type (e.g. "aws_s3")
* @param {string} params.key - object key
* @param {number} params.size - total byte length
* @param {string} params.dataStoreETag - object ETag
* @param {string} [params.dataStoreVersionId] - object version ID, if
* versioned
* @return {object} - the response object to serialize and send back
*/
function constructPutResponse(params) {
// FIXME: The main data locations array may eventually resemble
// locations stored in replication info object, i.e. without
// size/start for cloud locations, which could ease passing
// standard location objects across services. For now let's just
// create the location as they are usually stored in the
// "locations" attribute, with size/start info.
const location = [{
dataStoreName: params.dataStoreName,
dataStoreType: params.dataStoreType,
key: params.key,
start: 0,
size: params.size,
dataStoreETag: params.dataStoreETag,
dataStoreVersionId: params.dataStoreVersionId,
}];
return {
// TODO: Remove '' when versioning implemented for Azure.
versionId: params.dataStoreVersionId || '',
location,
};
}
function handleTaggingOperation(request, response, type, dataStoreVersionId,
log, callback) {
const storageLocation = request.headers['x-scal-storage-class'];
@ -206,7 +282,7 @@ function handleTaggingOperation(request, response, type, dataStoreVersionId,
return callback(errors.MalformedPOSTRequest);
}
}
return multipleBackendGateway.objectTagging(type, request.objectKey,
return dataClient.objectTagging(type, request.objectKey,
request.bucketName, objectMD, log, err => {
if (err) {
log.error(`error during object tagging: ${type}`, {
@ -232,6 +308,8 @@ PUT /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=putpart
DELETE /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=deleteobject
DELETE /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=abortmpu
DELETE /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=deleteobjecttagging
POST /_/backbeat/multiplebackenddata/<bucket name>/<object key>
@ -240,9 +318,45 @@ POST /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=completempu
POST /_/backbeat/multiplebackenddata/<bucket name>/<object key>
?operation=puttagging
GET /_/backbeat/multiplebackendmetadata/<bucket name>/<object key>
POST /_/backbeat/batchdelete
*/
function _getLastModified(locations, log, cb) {
const reqUids = log.getSerializedUids();
return dataClient.head(locations, reqUids, (err, data) => {
if (err) {
log.error('head object request failed', {
method: 'headObject',
error: err,
});
return cb(err);
}
return cb(null, data.LastModified || data.lastModified);
});
}
function headObject(request, response, log, cb) {
let locations;
try {
locations = JSON.parse(request.headers['x-scal-locations']);
} catch (e) {
const msg = 'x-scal-locations header is invalid';
return cb(errors.InvalidRequest.customizeDescription(msg));
}
if (!Array.isArray(locations)) {
const msg = 'x-scal-locations header is invalid';
return cb(errors.InvalidRequest.customizeDescription(msg));
}
return _getLastModified(locations, log, (err, lastModified) => {
if (err) {
return cb(err);
}
const dataRetrievalInfo = { lastModified };
return _respond(response, dataRetrievalInfo, log, cb);
});
}
function createCipherBundle(bucketInfo, isV2Request, log, cb) {
// Older backbeat versions do not support encryption (they ignore
// encryption parameters returned), hence we shall not encrypt if
@ -362,9 +476,17 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
// version (updating master as well) but with specified
// versionId
const options = {
versioning: true,
versioning: bucketInfo.isVersioningEnabled(),
versionId: omVal.versionId,
};
// If the object is from a source bucket without versioning (i.e. NFS),
// then we want to create a version for the replica object even though
// none was provided in the object metadata value.
if (omVal.replicationInfo.isNFS) {
const isReplica = omVal.replicationInfo.status === 'REPLICA';
options.versioning = isReplica;
omVal.replicationInfo.isNFS = !isReplica;
}
log.trace('putting object version', {
objectKey: request.objectKey, omVal, options });
return metadata.putObjectMD(bucketName, objectKey, omVal, options, log,
@ -386,7 +508,7 @@ function putMetadata(request, response, bucketInfo, objMd, log, callback) {
objectKey,
});
async.eachLimit(objMd.location, 5,
(loc, next) => data.delete(loc, log, err => {
(loc, next) => dataWrapper.data.delete(loc, log, err => {
if (err) {
log.warn('error removing old data location key', {
bucketName,
@ -426,10 +548,11 @@ function putObject(request, response, log, callback) {
const cacheControl = request.headers['x-scal-cache-control'];
const contentDisposition = request.headers['x-scal-content-disposition'];
const contentEncoding = request.headers['x-scal-content-encoding'];
const metaHeaders = {
'x-amz-meta-scal-replication-status': 'REPLICA',
'x-amz-meta-scal-version-id': sourceVersionId,
};
const tagging = request.headers['x-scal-tags'];
const metaHeaders = { 'x-amz-meta-scal-replication-status': 'REPLICA' };
if (sourceVersionId) {
metaHeaders['x-amz-meta-scal-version-id'] = sourceVersionId;
}
if (userMetadata !== undefined) {
try {
const metaData = JSON.parse(userMetadata);
@ -450,8 +573,17 @@ function putObject(request, response, log, callback) {
contentDisposition,
contentEncoding,
};
if (tagging !== undefined) {
try {
const tags = JSON.parse(request.headers['x-scal-tags']);
context.tagging = querystring.stringify(tags);
} catch (err) {
// FIXME: add error type MalformedJSON
return callback(errors.MalformedPOSTRequest);
}
}
const payloadLen = parseInt(request.headers['content-length'], 10);
const backendInfo = new BackendInfo(storageLocation);
const backendInfo = new BackendInfo(config, storageLocation);
return dataStore(context, CIPHER, request, payloadLen, {}, backendInfo, log,
(err, retrievalInfo, md5) => {
if (err) {
@ -464,11 +596,16 @@ function putObject(request, response, log, callback) {
if (contentMD5 !== md5) {
return callback(errors.BadDigest);
}
const dataRetrievalInfo = {
// TODO: Remove '' when versioning implemented for Azure.
versionId: retrievalInfo.dataStoreVersionId || '',
};
return _respond(response, dataRetrievalInfo, log, callback);
const responsePayload = constructPutResponse({
dataStoreName: retrievalInfo.dataStoreName,
dataStoreType: retrievalInfo.dataStoreType,
key: retrievalInfo.key,
size: payloadLen,
dataStoreETag: retrievalInfo.dataStoreETag ?
`1:${retrievalInfo.dataStoreETag}` : `1:${md5}`,
dataStoreVersionId: retrievalInfo.dataStoreVersionId,
});
return _respond(response, responsePayload, log, callback);
});
}
@ -478,12 +615,17 @@ function deleteObject(request, response, log, callback) {
return callback(err);
}
const storageLocation = request.headers['x-scal-storage-class'];
const objectGetInfo = {
key: request.objectKey,
dataStoreName: storageLocation,
};
const objectGetInfo = dataClient.toObjectGetInfo(
request.objectKey, request.bucketName, storageLocation);
if (!objectGetInfo) {
log.error('error deleting object in multiple backend', {
error: 'cannot create objectGetInfo',
method: 'deleteObject',
});
return callback(errors.InternalError);
}
const reqUids = log.getSerializedUids();
return multipleBackendGateway.delete(objectGetInfo, reqUids, err => {
return dataClient.delete(objectGetInfo, reqUids, err => {
if (err) {
log.error('error deleting object in multiple backend', {
error: err,
@ -514,10 +656,11 @@ function initiateMultipartUpload(request, response, log, callback) {
const cacheControl = request.headers['x-scal-cache-control'];
const contentDisposition = request.headers['x-scal-content-disposition'];
const contentEncoding = request.headers['x-scal-content-encoding'];
const metaHeaders = {
'scal-replication-status': 'REPLICA',
'scal-version-id': sourceVersionId,
};
const tags = request.headers['x-scal-tags'];
const metaHeaders = { 'x-amz-meta-scal-replication-status': 'REPLICA' };
if (sourceVersionId) {
metaHeaders['x-amz-meta-scal-version-id'] = sourceVersionId;
}
if (userMetadata !== undefined) {
try {
const metaData = JSON.parse(userMetadata);
@ -527,9 +670,19 @@ function initiateMultipartUpload(request, response, log, callback) {
return callback(errors.MalformedPOSTRequest);
}
}
return multipleBackendGateway.createMPU(request.objectKey, metaHeaders,
let tagging;
if (tags !== undefined) {
try {
const parsedTags = JSON.parse(request.headers['x-scal-tags']);
tagging = querystring.stringify(parsedTags);
} catch (err) {
// FIXME: add error type MalformedJSON
return callback(errors.MalformedPOSTRequest);
}
}
return dataClient.createMPU(request.objectKey, metaHeaders,
request.bucketName, undefined, storageLocation, contentType,
cacheControl, contentDisposition, contentEncoding, null, log,
cacheControl, contentDisposition, contentEncoding, tagging, log,
(err, data) => {
if (err) {
log.error('error initiating multipart upload', {
@ -545,6 +698,26 @@ function initiateMultipartUpload(request, response, log, callback) {
});
}
function abortMultipartUpload(request, response, log, callback) {
const err = _checkMultipleBackendRequest(request, log);
if (err) {
return callback(err);
}
const storageLocation = request.headers['x-scal-storage-class'];
const uploadId = request.headers['x-scal-upload-id'];
return dataClient.abortMPU(request.objectKey, uploadId,
storageLocation, request.bucketName, log, err => {
if (err) {
log.error('error aborting MPU', {
error: err,
method: 'abortMultipartUpload',
});
return callback(err);
}
return _respond(response, {}, log, callback);
});
}
function putPart(request, response, log, callback) {
const err = _checkMultipleBackendRequest(request, log);
if (err) {
@ -554,7 +727,7 @@ function putPart(request, response, log, callback) {
const partNumber = request.headers['x-scal-part-number'];
const uploadId = request.headers['x-scal-upload-id'];
const payloadLen = parseInt(request.headers['content-length'], 10);
return multipleBackendGateway.uploadPart(undefined, {}, request, payloadLen,
return dataClient.uploadPart(undefined, {}, request, payloadLen,
storageLocation, request.objectKey, uploadId, partNumber,
request.bucketName, log, (err, data) => {
if (err) {
@ -586,6 +759,7 @@ function completeMultipartUpload(request, response, log, callback) {
const cacheControl = request.headers['x-scal-cache-control'];
const contentDisposition = request.headers['x-scal-content-disposition'];
const contentEncoding = request.headers['x-scal-content-encoding'];
const tags = request.headers['x-scal-tags'];
const data = [];
let totalLength = 0;
request.on('data', chunk => {
@ -600,13 +774,13 @@ function completeMultipartUpload(request, response, log, callback) {
// FIXME: add error type MalformedJSON
return callback(errors.MalformedPOSTRequest);
}
const partList =
getPartList(parts, request.objectKey, uploadId, storageLocation);
const partList = getPartList(
parts, request.objectKey, uploadId, storageLocation);
// Azure client will set user metadata at this point.
const metaHeaders = {
'x-amz-meta-scal-replication-status': 'REPLICA',
'x-amz-meta-scal-version-id': sourceVersionId,
};
const metaHeaders = { 'x-amz-meta-scal-replication-status': 'REPLICA' };
if (sourceVersionId) {
metaHeaders['x-amz-meta-scal-version-id'] = sourceVersionId;
}
if (userMetadata !== undefined) {
try {
const metaData = JSON.parse(userMetadata);
@ -616,15 +790,28 @@ function completeMultipartUpload(request, response, log, callback) {
return callback(errors.MalformedPOSTRequest);
}
}
// Azure does not have a notion of initiating an MPU, so we put any
// tagging fields during the complete MPU if using Azure.
let tagging;
if (tags !== undefined) {
try {
const parsedTags = JSON.parse(request.headers['x-scal-tags']);
tagging = querystring.stringify(parsedTags);
} catch (err) {
// FIXME: add error type MalformedJSON
return callback(errors.MalformedPOSTRequest);
}
}
const contentSettings = {
contentType: contentType || undefined,
cacheControl: cacheControl || undefined,
contentDisposition: contentDisposition || undefined,
contentEncoding: contentEncoding || undefined,
};
return multipleBackendGateway.completeMPU(request.objectKey, uploadId,
return dataClient.completeMPU(request.objectKey, uploadId,
storageLocation, partList, undefined, request.bucketName,
metaHeaders, contentSettings, log, (err, retrievalInfo) => {
metaHeaders, contentSettings, tagging, log,
(err, retrievalInfo) => {
if (err) {
log.error('error completing MPU', {
error: err,
@ -632,11 +819,31 @@ function completeMultipartUpload(request, response, log, callback) {
});
return callback(err);
}
const dataRetrievalInfo = {
// TODO: Remove '' when versioning implemented for Azure.
versionId: retrievalInfo.dataStoreVersionId || '',
};
return _respond(response, dataRetrievalInfo, log, callback);
// The logic here is an aggregate of code coming from
// lib/api/completeMultipartUpload.js.
const { key, dataStoreType, dataStoreVersionId } =
retrievalInfo;
let size;
let dataStoreETag;
if (skipMpuPartProcessing(retrievalInfo)) {
size = retrievalInfo.contentLength;
dataStoreETag = retrievalInfo.eTag;
} else {
const { aggregateSize, aggregateETag } =
generateMpuAggregateInfo(parts);
size = aggregateSize;
dataStoreETag = aggregateETag;
}
const responsePayload = constructPutResponse({
dataStoreName: storageLocation,
dataStoreType,
key,
size,
dataStoreETag,
dataStoreVersionId,
});
return _respond(response, responsePayload, log, callback);
});
});
return undefined;
@ -700,26 +907,137 @@ function deleteObjectTagging(request, response, log, callback) {
dataStoreVersionId, log, callback);
}
function _createAzureConditionalDeleteObjectGetInfo(request) {
const { objectKey, bucketName, headers } = request;
const objectGetInfo = dataClient.toObjectGetInfo(
objectKey, bucketName, headers['x-scal-storage-class']);
return Object.assign({}, objectGetInfo, {
options: {
accessConditions: {
DateUnModifiedSince: new Date(headers['if-unmodified-since']),
},
},
});
}
function _azureConditionalDelete(request, response, log, cb) {
const objectGetInfo = _createAzureConditionalDeleteObjectGetInfo(request);
const reqUids = log.getSerializedUids();
return dataClient.delete(objectGetInfo, reqUids, err => {
if (err && err.code === 412) {
log.info('precondition for Azure deletion was not met', {
method: '_azureConditionalDelete',
key: request.objectKey,
bucket: request.bucketName,
});
return cb(err);
}
if (err) {
log.error('error deleting object in Azure', {
error: err,
method: '_azureConditionalDelete',
});
return cb(err);
}
return _respond(response, {}, log, cb);
});
}
function _putTagging(request, response, log, cb) {
return handleTaggingOperation(
request, response, 'Put', undefined, log, err => {
if (err) {
log.error('put tagging failed', {
method: '_putTagging',
error: err,
});
return cb(err);
}
return _respond(response, null, log, cb);
});
}
function _conditionalTagging(request, response, locations, log, cb) {
return _getLastModified(locations, log, (err, lastModified) => {
if (err) {
return cb(err);
}
const ifUnmodifiedSince = request.headers['if-unmodified-since'];
if (new Date(ifUnmodifiedSince) < new Date(lastModified)) {
log.info('object has been modified, skipping tagging operation', {
method: '_conditionalTagging',
ifUnmodifiedSince,
lastModified,
key: request.objectKey,
bucket: request.bucketName,
});
return _respond(response, null, log, cb);
}
return _putTagging(request, response, log, cb);
});
}
function _performConditionalDelete(request, response, locations, log, cb) {
const { headers } = request;
const location = locationConstraints[headers['x-scal-storage-class']];
if (!request.headers['if-unmodified-since']) {
log.info('unknown last modified time, skipping conditional delete', {
method: '_performConditionalDelete',
});
return _respond(response, null, log, cb);
}
// Azure supports a conditional delete operation.
if (location && location.type === 'azure') {
return _azureConditionalDelete(request, response, log, cb);
}
// Other clouds do not support a conditional delete. Instead, we
// conditionally put tags to indicate if it should be deleted by the user.
return _conditionalTagging(request, response, locations, log, cb);
}
function _shouldConditionallyDelete(request, locations) {
if (locations.length === 0) {
return false;
}
const storageClass = request.headers['x-scal-storage-class'];
const type =
storageClass &&
locationConstraints[storageClass] &&
locationConstraints[storageClass].type;
const isExternalBackend = type && constants.externalBackends[type];
const isNotVersioned = !locations[0].dataStoreVersionId;
return isExternalBackend && isNotVersioned;
}
function batchDelete(request, response, log, callback) {
return _getRequestPayload(request, (err, payload) => {
if (err) {
return callback(err);
}
let request;
let parsedPayload;
try {
request = JSON.parse(payload);
parsedPayload = JSON.parse(payload);
} catch (e) {
// FIXME: add error type MalformedJSON
return callback(errors.MalformedPOSTRequest);
}
if (!request || !Array.isArray(request.Locations)) {
if (!parsedPayload || !Array.isArray(parsedPayload.Locations)) {
return callback(errors.MalformedPOSTRequest);
}
const locations = request.Locations;
const locations = parsedPayload.Locations;
if (_shouldConditionallyDelete(request, locations)) {
return _performConditionalDelete(
request, response, locations, log, callback);
}
log.trace('batch delete locations', { locations });
return async.eachLimit(locations, 5, (loc, next) => {
data.delete(loc, log, err => {
if (err && err.ObjNotFound) {
const _loc = Object.assign({}, loc);
if (_loc.dataStoreVersionId !== undefined) {
// required by cloud backends
_loc.deleteVersion = true;
}
dataWrapper.data.delete(_loc, log, err => {
if (err && err.is.ObjNotFound) {
log.info('batch delete: data location do not exist', {
method: 'batchDelete',
location: loc,
@ -764,31 +1082,85 @@ const backbeatRoutes = {
multiplebackenddata: {
deleteobject: deleteObject,
deleteobjecttagging: deleteObjectTagging,
abortmpu: abortMultipartUpload,
},
},
GET: {
metadata: getMetadata,
multiplebackendmetadata: headObject,
},
};
function routeBackbeat(clientIP, request, response, log) {
// Attach the apiMethod method to the request, so it can used by monitoring in the server
// eslint-disable-next-line no-param-reassign
request.apiMethod = 'routeBackbeat';
log.debug('routing request', {
method: 'routeBackbeat',
url: request.url,
});
_normalizeBackbeatRequest(request);
const useMultipleBackend = request.resourceType === 'multiplebackenddata';
const requestContexts = prepareRequestContexts('objectReplicate', request);
// proxy api requests to Backbeat API server
if (request.resourceType === 'api') {
if (!config.backbeat) {
log.debug('unable to proxy backbeat api request', {
backbeatConfig: config.backbeat,
});
return responseJSONBody(errors.MethodNotAllowed, null, response,
log);
}
const path = request.url.replace('/_/backbeat/api', '/_/');
const { host, port } = config.backbeat;
const target = `http://${host}:${port}${path}`;
return auth.server.doAuth(request, log, (err, userInfo) => {
if (err) {
log.debug('authentication error', {
error: err,
method: request.method,
bucketName: request.bucketName,
objectKey: request.objectKey,
});
return responseJSONBody(err, null, response, log);
}
// FIXME for now, any authenticated user can access API
// routes. We should introduce admin accounts or accounts
// with admin privileges, and restrict access to those
// only.
if (userInfo.getCanonicalID() === constants.publicId) {
log.debug('unauthenticated access to API routes', {
method: request.method,
bucketName: request.bucketName,
objectKey: request.objectKey,
});
return responseJSONBody(
errors.AccessDenied, null, response, log);
}
return backbeatProxy.web(request, response, { target }, err => {
log.error('error proxying request to api server',
{ error: err.message });
return responseJSONBody(errors.ServiceUnavailable, null,
response, log);
});
}, 's3', requestContexts);
}
const useMultipleBackend =
request.resourceType && request.resourceType.startsWith('multiplebackend');
const invalidRequest =
(!request.resourceType ||
(_isObjectRequest(request) &&
(!request.bucketName || !request.objectKey)) ||
(!request.query.operation && useMultipleBackend));
(!request.query.operation &&
request.resourceType === 'multiplebackenddata'));
const invalidRoute =
(backbeatRoutes[request.method] === undefined ||
backbeatRoutes[request.method][request.resourceType] === undefined ||
(backbeatRoutes[request.method][request.resourceType]
[request.query.operation] === undefined &&
useMultipleBackend));
request.resourceType === 'multiplebackenddata'));
log.addDefaultFields({
bucketName: request.bucketName,
objectKey: request.objectKey,
@ -797,8 +1169,8 @@ function routeBackbeat(clientIP, request, response, log) {
});
if (invalidRequest || invalidRoute) {
log.debug(invalidRequest ? 'invalid request' : 'no such route', {
method: request.method, bucketName: request.bucketName,
objectKey: request.objectKey, resourceType: request.resourceType,
method: request.method,
resourceType: request.resourceType,
query: request.query,
});
return responseJSONBody(errors.MethodNotAllowed, null, response, log);
@ -813,7 +1185,6 @@ function routeBackbeat(clientIP, request, response, log) {
return undefined;
});
}
const requestContexts = prepareRequestContexts('objectReplicate', request);
const decodedVidResult = decodeVersionId(request.query);
if (decodedVidResult instanceof Error) {
log.trace('invalid versionId query', {

View File

@ -10,10 +10,18 @@ const { clientCheck } = require('./utilities/healthcheckHandler');
const _config = require('./Config').config;
const { blacklistedPrefixes } = require('../constants');
const api = require('./api/api');
const data = require('./data/wrapper');
const dataWrapper = require('./data/wrapper');
const kms = require('./kms/wrapper');
const locationStorageCheck =
require('./api/apiUtils/object/locationStorageCheck');
const vault = require('./auth/vault');
const metadata = require('./metadata/wrapper');
const routes = arsenal.s3routes.routes;
const { parseLC, MultipleBackendGateway } = arsenal.storage.data;
const websiteEndpoints = _config.websiteEndpoints;
let client = dataWrapper.client;
const implName = dataWrapper.implName;
let allEndpoints;
function updateAllEndpoints() {
@ -21,6 +29,13 @@ function updateAllEndpoints() {
}
_config.on('rest-endpoints-update', updateAllEndpoints);
updateAllEndpoints();
_config.on('location-constraints-update', () => {
if (implName === 'multipleBackends') {
const clients = parseLC(_config, vault);
client = new MultipleBackendGateway(
clients, metadata, locationStorageCheck);
}
});
// redis client
let localCacheClient;
@ -78,7 +93,15 @@ class S3Server {
allEndpoints,
websiteEndpoints,
blacklistedPrefixes,
dataRetrievalFn: data.get,
dataRetrievalParams: {
client,
implName,
config: _config,
kms,
metadata,
locStorageCheckFn: locationStorageCheck,
vault,
},
};
routes(req, res, params, logger, _config);
}
@ -159,6 +182,7 @@ class S3Server {
}
initiateStartup(log) {
metadata.setup(() => {
clientCheck(true, log, (err, results) => {
if (err) {
log.info('initial health check failed, delaying startup', {
@ -177,6 +201,7 @@ class S3Server {
}
}
});
});
}
}

View File

@ -7,15 +7,13 @@ const ObjectMD = require('arsenal').models.ObjectMD;
const BucketInfo = require('arsenal').models.BucketInfo;
const acl = require('./metadata/acl');
const constants = require('../constants');
const data = require('./data/wrapper');
const { data } = require('./data/wrapper');
const metadata = require('./metadata/wrapper');
const logger = require('./utilities/logger');
const { setObjectLockInformation }
= require('./api/apiUtils/object/objectLockHelpers');
const removeAWSChunked = require('./api/apiUtils/object/removeAWSChunked');
const { parseTagFromQuery } = s3middleware.tagging;
const { config } = require('./Config');
const multipleBackendGateway = require('./data/multipleBackendGateway');
const usersBucket = constants.usersBucket;
const oldUsersBucket = constants.oldUsersBucket;
@ -306,20 +304,15 @@ const services = {
}
const objGetInfo = objectMD.location;
if (objGetInfo && objGetInfo[0]
&& config.backends.data === 'multiple') {
return multipleBackendGateway.protectAzureBlocks(bucketName,
objectKey, objGetInfo[0].dataStoreName, log, err => {
// if an error is returned, there is an MPU initiated with same
// key name as object to delete
// special case that prevents azure blocks from unecessary deletion
// will return null if no need
return data.protectAzureBlocks(bucketName, objectKey, objGetInfo,
log, err => {
if (err) {
return cb(err.customizeDescription('Error deleting ' +
`object on Azure: ${err.message}`));
return cb(err);
}
return deleteMDandData();
});
}
return deleteMDandData();
},
/**
@ -458,80 +451,6 @@ const services = {
});
},
/**
* Mark the MPU overview key with a flag when starting the
* CompleteMPU operation, to be checked by "put part" operations
*
* @param {object} params - params object
* @param {string} params.bucketName - name of MPU bucket
* @param {string} params.objectKey - object key
* @param {string} params.uploadId - upload ID
* @param {string} params.splitter - splitter for this overview key
* @param {object} params.storedMetadata - original metadata of the overview key
* @param {Logger} log - Logger object
* @param {function} cb - callback(err)
* @return {undefined}
*/
metadataMarkMPObjectForCompletion(params, log, cb) {
assert.strictEqual(typeof params, 'object');
assert.strictEqual(typeof params.bucketName, 'string');
assert.strictEqual(typeof params.objectKey, 'string');
assert.strictEqual(typeof params.uploadId, 'string');
assert.strictEqual(typeof params.splitter, 'string');
assert.strictEqual(typeof params.storedMetadata, 'object');
const splitter = params.splitter;
const longMPUIdentifier =
`overview${splitter}${params.objectKey}${splitter}${params.uploadId}`;
const multipartObjectMD = Object.assign({}, params.storedMetadata);
multipartObjectMD.completeInProgress = true;
metadata.putObjectMD(params.bucketName, longMPUIdentifier, multipartObjectMD,
{}, log, err => {
if (err) {
log.error('error from metadata', { error: err });
return cb(err);
}
return cb();
});
},
/**
* Returns if a CompleteMPU operation is in progress for this
* object, by looking at the `completeInProgress` flag stored in
* the overview key
*
* @param {object} params - params object
* @param {string} params.bucketName - bucket name where object should be stored
* @param {string} params.objectKey - object key
* @param {string} params.uploadId - upload ID
* @param {string} params.splitter - splitter for this overview key
* @param {object} log - request logger instance
* @param {function} cb - callback(err, {bool} completeInProgress)
* @return {undefined}
*/
isCompleteMPUInProgress(params, log, cb) {
assert.strictEqual(typeof params, 'object');
assert.strictEqual(typeof params.bucketName, 'string');
assert.strictEqual(typeof params.objectKey, 'string');
assert.strictEqual(typeof params.uploadId, 'string');
assert.strictEqual(typeof params.splitter, 'string');
const mpuBucketName = `${constants.mpuBucketPrefix}${params.bucketName}`;
const splitter = params.splitter;
const mpuOverviewKey =
`overview${splitter}${params.objectKey}${splitter}${params.uploadId}`;
return metadata.getObjectMD(mpuBucketName, mpuOverviewKey, {}, log,
(err, res) => {
if (err) {
log.error('error getting the overview object from mpu bucket', {
error: err,
method: 'services.isCompleteMPUInProgress',
params,
});
return cb(err);
}
return cb(null, Boolean(res.completeInProgress));
});
},
/**
* Checks whether bucket exists, multipart upload
@ -555,7 +474,7 @@ const services = {
// If the MPU was initiated, the mpu bucket should exist.
const mpuBucketName = `${constants.mpuBucketPrefix}${bucketName}`;
metadata.getBucket(mpuBucketName, log, (err, mpuBucket) => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
log.debug('bucket not found in metadata', { error: err,
method: 'services.metadataValidateMultipart' });
return cb(errors.NoSuchUpload);
@ -577,7 +496,7 @@ const services = {
metadata.getObjectMD(mpuBucket.getName(), mpuOverviewKey,
{}, log, (err, storedMetadata) => {
if (err) {
if (err.NoSuchKey) {
if (err.is.NoSuchKey) {
return cb(errors.NoSuchUpload);
}
log.error('error from metadata', { error: err });
@ -753,7 +672,7 @@ const services = {
assert.strictEqual(typeof bucketName, 'string');
const MPUBucketName = `${constants.mpuBucketPrefix}${bucketName}`;
metadata.getBucket(MPUBucketName, log, (err, bucket) => {
if (err && err.NoSuchBucket) {
if (err && err.is.NoSuchBucket) {
log.trace('no buckets found');
const creationDate = new Date().toJSON();
const mpuBucket = new BucketInfo(MPUBucketName,

View File

@ -1,6 +1,6 @@
const { errors, ipCheck } = require('arsenal');
const _config = require('../Config').config;
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const vault = require('../auth/vault');
const metadata = require('../metadata/wrapper');
const async = require('async');

View File

@ -5,7 +5,7 @@ const { errors, ipCheck } = require('arsenal');
const async = require('async');
const config = require('../Config').config;
const data = require('../data/wrapper');
const { data } = require('../data/wrapper');
const metadata = require('../metadata/wrapper');
const REPORT_MODEL_VERSION = 1;

View File

@ -20,11 +20,10 @@
"homepage": "https://github.com/scality/S3#readme",
"dependencies": {
"@hapi/joi": "^17.1.0",
"arsenal": "git+https://github.com/scality/arsenal#7.10.15",
"arsenal": "git+https://github.com/scality/Arsenal#7.10.23",
"async": "~2.5.0",
"aws-sdk": "2.905.0",
"azure-storage": "^2.1.0",
"bucketclient": "scality/bucketclient#7.10.2",
"commander": "^2.9.0",
"cron-parser": "^2.11.0",
"diskusage": "1.1.3",
@ -34,7 +33,6 @@
"level-mem": "^5.0.1",
"moment": "^2.26.0",
"npm-run-all": "~4.1.5",
"sproxydclient": "scality/sproxydclient#7.10.2",
"utapi": "scality/utapi#7.10.6",
"utf8": "~2.1.1",
"uuid": "^3.0.1",

View File

@ -81,7 +81,7 @@ describe('aws-node-sdk test deleteBucketReplication', () => {
}),
next => deleteReplicationAndCheckResponse(bucket, next),
next => s3.getBucketReplication({ Bucket: bucket }, err => {
assert(errors.ReplicationConfigurationNotFoundError[err.code]);
assert(errors.ReplicationConfigurationNotFoundError.is[err.code]);
return next();
}),
], done));

View File

@ -23,7 +23,7 @@ describe('aws-sdk test get bucket encryption', () => {
before(done => {
const config = getConfig('default', { signatureVersion: 'v4' });
s3 = new S3(config);
return done();
return metadata.setup(done);
});
beforeEach(done => s3.createBucket({ Bucket: bucketName }, done));

View File

@ -45,7 +45,7 @@ describe('aws-node-sdk test getBucketReplication', () => {
it("should return 'ReplicationConfigurationNotFoundError' if bucket does " +
'not have a replication configuration', done =>
s3.getBucketReplication({ Bucket: bucket }, err => {
assert(errors.ReplicationConfigurationNotFoundError[err.code]);
assert(errors.ReplicationConfigurationNotFoundError.is[err.code]);
return done();
}));

View File

@ -210,39 +210,5 @@ describe('Complete MPU', () => {
});
});
});
describe('with re-upload of part during CompleteMPU execution', () => {
let uploadId;
let eTag;
beforeEach(() => _initiateMpuAndPutOnePart()
.then(result => {
uploadId = result.uploadId;
eTag = result.eTag;
})
);
it('should complete the MPU successfully and leave a readable object', done => {
async.parallel([
doneReUpload => s3.uploadPart({
Bucket: bucket,
Key: key,
PartNumber: 1,
UploadId: uploadId,
Body: 'foo',
}, err => {
// in case the CompleteMPU finished earlier,
// we may get a NoSuchKey error, so just
// ignore it
if (err && err.code === 'NoSuchKey') {
return doneReUpload();
}
return doneReUpload(err);
}),
doneComplete => _completeMpuAndCheckVid(
uploadId, eTag, undefined, doneComplete),
], done);
});
});
});
});

View File

@ -577,72 +577,6 @@ describe('Object Part Copy', () => {
checkNoError(err);
});
});
it('should not corrupt object if overwriting an existing part by copying a part ' +
'while the MPU is being completed', () => {
// AWS response etag for this completed MPU
const finalObjETag = '"db77ebbae9e9f5a244a26b86193ad818-1"';
process.stdout.write('Putting first part in MPU test');
return s3.uploadPartCopy({ Bucket: destBucketName,
Key: destObjName,
CopySource: `${sourceBucketName}/${sourceObjName}`,
PartNumber: 1,
UploadId: uploadId,
}).promise().then(res => {
assert.strictEqual(res.ETag, etag);
assert(res.LastModified);
}).then(() => {
process.stdout.write('Overwriting first part in MPU test and completing MPU ' +
'at the same time');
return Promise.all([
s3.uploadPartCopy({
Bucket: destBucketName,
Key: destObjName,
CopySource: `${sourceBucketName}/${sourceObjName}`,
PartNumber: 1,
UploadId: uploadId,
}).promise().catch(err => {
// in case the CompleteMPU finished
// earlier, we may get a NoSuchKey error,
// so just ignore it and resolve with a
// special value, otherwise re-throw the
// error
if (err && err.code === 'NoSuchKey') {
return Promise.resolve(null);
}
throw err;
}),
s3.completeMultipartUpload({
Bucket: destBucketName,
Key: destObjName,
UploadId: uploadId,
MultipartUpload: {
Parts: [
{ ETag: etag, PartNumber: 1 },
],
},
}).promise(),
]);
}).then(([uploadRes, completeRes]) => {
// if upload succeeded before CompleteMPU finished
if (uploadRes !== null) {
assert.strictEqual(uploadRes.ETag, etag);
assert(uploadRes.LastModified);
}
assert.strictEqual(completeRes.Bucket, destBucketName);
assert.strictEqual(completeRes.Key, destObjName);
assert.strictEqual(completeRes.ETag, finalObjETag);
}).then(() => {
process.stdout.write('Getting object put by MPU with ' +
'overwrite part');
return s3.getObject({
Bucket: destBucketName,
Key: destObjName,
}).promise();
}).then(res => {
assert.strictEqual(res.ETag, finalObjETag);
});
});
});
it('should return an error if no such upload initiated',

View File

@ -86,7 +86,7 @@ describe('GET object legal hold', () => {
s3.getObjectLegalHold({
Bucket: bucket,
Key: key,
VersionId: '000000000000',
VersionId: '012345678901234567890123456789012',
}, err => {
checkError(err, 'NoSuchVersion', 404);
done();

View File

@ -3,7 +3,18 @@ const async = require('async');
const withV4 = require('../support/withV4');
const BucketUtility = require('../../lib/utility/bucket-util');
const objectConfigs = require('../support/objectConfigs');
const { maximumAllowedPartCount } = require('../../../../../constants');
const bucket = 'mpu-test-bucket';
const object = 'mpu-test-object';
const bodySize = 1024 * 1024 * 5;
const bodyContent = 'a';
const howManyParts = 3;
const partNumbers = Array.from(Array(howManyParts).keys());
const invalidPartNumbers = [-1, 0, maximumAllowedPartCount + 1];
let ETags = [];
function checkError(err, statusCode, code) {
assert.strictEqual(err.statusCode, statusCode);
@ -15,28 +26,22 @@ function checkNoError(err) {
`Expected success, got error ${JSON.stringify(err)}`);
}
function generateContent(size, bodyContent) {
return Buffer.alloc(size, bodyContent);
function generateContent(partNumber) {
return Buffer.alloc(bodySize + partNumber, bodyContent);
}
describe('Part size tests with object head', () => {
objectConfigs.forEach(config => {
describe(config.signature, () => {
let ETags = [];
const {
bucket,
object,
bodySize,
bodyContent,
partNumbers,
invalidPartNumbers,
} = config;
withV4(sigCfg => { //eslint-disable-line
withV4(sigCfg => {
let bucketUtil;
let s3;
function headObject(fields, cb) {
s3.headObject(Object.assign({
Bucket: bucket,
Key: object,
}, fields), cb);
}
beforeEach(function beforeF(done) {
bucketUtil = new BucketUtility('default', sigCfg);
s3 = bucketUtil.s3;
@ -50,16 +55,12 @@ describe('Part size tests with object head', () => {
return next();
}),
next => async.mapSeries(partNumbers, (partNumber, callback) => {
let allocAmount = bodySize + partNumber + 1;
if (config.signature === 'for empty object') {
allocAmount = 0;
}
const uploadPartParams = {
Bucket: bucket,
Key: object,
PartNumber: partNumber + 1,
UploadId: this.currentTest.UploadId,
Body: generateContent(allocAmount, bodyContent),
Body: generateContent(partNumber + 1),
};
return s3.uploadPart(uploadPartParams,
@ -104,11 +105,10 @@ describe('Part size tests with object head', () => {
it('should return the total size of the object ' +
'when --part-number is not used', done => {
const totalSize = config.meta.computeTotalSize(partNumbers, bodySize);
s3.headObject({ Bucket: bucket, Key: object }, (err, data) => {
const totalSize = partNumbers.reduce((total, current) =>
total + (bodySize + current + 1), 0);
headObject({}, (err, data) => {
checkNoError(err);
assert.equal(totalSize, data.ContentLength);
done();
});
@ -119,12 +119,8 @@ describe('Part size tests with object head', () => {
`when --part-number is set to ${part + 1}`, done => {
const partNumber = Number.parseInt(part, 0) + 1;
const partSize = bodySize + partNumber;
s3.headObject({ Bucket: bucket, Key: object, PartNumber: partNumber }, (err, data) => {
headObject({ PartNumber: partNumber }, (err, data) => {
checkNoError(err);
if (data.ContentLength === 0) {
done();
}
assert.equal(partSize, data.ContentLength);
done();
});
@ -134,7 +130,7 @@ describe('Part size tests with object head', () => {
invalidPartNumbers.forEach(part => {
it(`should return an error when --part-number is set to ${part}`,
done => {
s3.headObject({ Bucket: bucket, Key: object, PartNumber: part }, (err, data) => {
headObject({ PartNumber: part }, (err, data) => {
checkError(err, 400, 'BadRequest');
assert.strictEqual(data, null);
done();
@ -142,26 +138,15 @@ describe('Part size tests with object head', () => {
});
});
it('when incorrect --part-number is used', done => {
bucketUtil = new BucketUtility('default', sigCfg);
s3 = bucketUtil.s3;
s3.headObject({ Bucket: bucket, Key: object, PartNumber: partNumbers.length + 1 },
it('should return an error when incorrect --part-number is used',
done => {
headObject({ PartNumber: partNumbers.length + 1 },
(err, data) => {
if (config.meta.objectIsEmpty) {
// returns metadata for the only empty part
checkNoError(err);
assert.strictEqual(data.ContentLength, 0);
done();
} else {
// returns a 416 error
// the error response does not contain the actual
// statusCode instead it has '416'
checkError(err, 416, 416);
assert.strictEqual(data, null);
done();
}
});
});
});
});
});

View File

@ -106,7 +106,7 @@ describe('GET object retention', () => {
s3.getObjectRetention({
Bucket: bucketName,
Key: objectName,
VersionId: '000000000000',
VersionId: '012345678901234567890123456789012',
}, err => {
checkError(err, 'NoSuchVersion', 404);
done();

View File

@ -98,7 +98,7 @@ describe('PUT object legal hold', () => {
s3.putObjectLegalHold({
Bucket: bucket,
Key: key,
VersionId: '000000000000',
VersionId: '012345678901234567890123456789012',
LegalHold: mockLegalHold.on,
}, err => {
checkError(err, 'NoSuchVersion', 404);

View File

@ -79,7 +79,7 @@ describe('PUT object retention', () => {
s3.putObjectRetention({
Bucket: bucketName,
Key: objectName,
VersionId: '000000000000',
VersionId: '012345678901234567890123456789012',
Retention: retentionConfig,
}, err => {
checkError(err, 'NoSuchVersion', 404);

View File

@ -1,40 +0,0 @@
const { maximumAllowedPartCount } = require('../../../../../constants');
const canonicalObjectConfig = {
bucket: 'mpu-test-bucket-canonical-object',
object: 'mpu-test-object-canonical',
bodySize: 1024 * 1024 * 5,
bodyContent: 'a',
howManyParts: 3,
partNumbers: Array.from(Array(3).keys()), // 3 corresponds to howManyParts
invalidPartNumbers: [-1, 0, maximumAllowedPartCount + 1],
signature: 'for canonical object',
meta: {
computeTotalSize: (partNumbers, bodySize) => partNumbers.reduce((total, current) =>
total + bodySize + current + 1
, 0),
objectIsEmpty: false,
},
};
const emptyObjectConfig = {
bucket: 'mpu-test-bucket-empty-object',
object: 'mpu-test-object-empty',
bodySize: 0,
bodyContent: null,
howManyParts: 1,
partNumbers: Array.from(Array(1).keys()), // 1 corresponds to howManyParts
invalidPartNumbers: [-1, 0, maximumAllowedPartCount + 1],
signature: 'for empty object',
meta: {
computeTotalSize: () => 0,
objectIsEmpty: true,
},
};
const objectConfigs = [
canonicalObjectConfig,
emptyObjectConfig,
];
module.exports = objectConfigs;

View File

@ -27,7 +27,6 @@ const testData = 'testkey data';
const testDataMd5 = crypto.createHash('md5')
.update(testData, 'utf-8')
.digest('hex');
const emptyContentsMd5 = 'd41d8cd98f00b204e9800998ecf8427e';
const testMd = {
'md-model-version': 2,
'owner-display-name': 'Bart',
@ -61,17 +60,6 @@ const testMd = {
},
};
function checkObjectData(s3, objectKey, dataValue, done) {
s3.getObject({
Bucket: TEST_BUCKET,
Key: objectKey,
}, (err, data) => {
assert.ifError(err);
assert.strictEqual(data.Body.toString(), dataValue);
done();
});
}
/** makeBackbeatRequest - utility function to generate a request going
* through backbeat route
* @param {object} params - params for making request
@ -428,8 +416,8 @@ describeSkipIfAWS('backbeat routes', () => {
});
});
it('should remove old object data locations if version is overwritten ' +
'with same contents', done => {
it('should remove old object data locations if version is overwritten',
done => {
let oldLocation;
const testKeyOldData = `${testKey}-old-data`;
async.waterfall([next => {
@ -503,8 +491,14 @@ describeSkipIfAWS('backbeat routes', () => {
}, (response, next) => {
assert.strictEqual(response.statusCode, 200);
// give some time for the async deletes to complete
setTimeout(() => checkObjectData(s3, testKey, testData, next),
1000);
setTimeout(() => s3.getObject({
Bucket: TEST_BUCKET,
Key: testKey,
}, (err, data) => {
assert.ifError(err);
assert.strictEqual(data.Body.toString(), testData);
next();
}), 1000);
}, next => {
// check that the object copy referencing the old data
// locations is unreadable, confirming that the old
@ -522,89 +516,6 @@ describeSkipIfAWS('backbeat routes', () => {
done();
});
});
it('should remove old object data locations if version is overwritten ' +
'with empty contents', done => {
let oldLocation;
const testKeyOldData = `${testKey}-old-data`;
async.waterfall([next => {
// put object's data locations
makeBackbeatRequest({
method: 'PUT', bucket: TEST_BUCKET,
objectKey: testKey,
resourceType: 'data',
headers: {
'content-length': testData.length,
'content-md5': testDataMd5,
'x-scal-canonical-id': testArn,
},
authCredentials: backbeatAuthCredentials,
requestBody: testData }, next);
}, (response, next) => {
assert.strictEqual(response.statusCode, 200);
// put object metadata
const newMd = Object.assign({}, testMd);
newMd.location = JSON.parse(response.body);
oldLocation = newMd.location;
makeBackbeatRequest({
method: 'PUT', bucket: TEST_BUCKET,
objectKey: testKey,
resourceType: 'metadata',
authCredentials: backbeatAuthCredentials,
requestBody: JSON.stringify(newMd),
}, next);
}, (response, next) => {
assert.strictEqual(response.statusCode, 200);
// put another object which metadata reference the
// same data locations, we will attempt to retrieve
// this object at the end of the test to confirm that
// its locations have been deleted
const oldDataMd = Object.assign({}, testMd);
oldDataMd.location = oldLocation;
makeBackbeatRequest({
method: 'PUT', bucket: TEST_BUCKET,
objectKey: testKeyOldData,
resourceType: 'metadata',
authCredentials: backbeatAuthCredentials,
requestBody: JSON.stringify(oldDataMd),
}, next);
}, (response, next) => {
assert.strictEqual(response.statusCode, 200);
// overwrite the original object version with an empty location
const newMd = Object.assign({}, testMd);
newMd['content-length'] = 0;
newMd['content-md5'] = emptyContentsMd5;
newMd.location = null;
makeBackbeatRequest({
method: 'PUT', bucket: TEST_BUCKET,
objectKey: testKey,
resourceType: 'metadata',
authCredentials: backbeatAuthCredentials,
requestBody: JSON.stringify(newMd),
}, next);
}, (response, next) => {
assert.strictEqual(response.statusCode, 200);
// give some time for the async deletes to complete
setTimeout(() => checkObjectData(s3, testKey, '', next),
1000);
}, next => {
// check that the object copy referencing the old data
// locations is unreadable, confirming that the old
// data locations have been deleted
s3.getObject({
Bucket: TEST_BUCKET,
Key: testKeyOldData,
}, err => {
assert(err, 'expected error to get object with old data ' +
'locations, got success');
next();
});
}], err => {
assert.ifError(err);
done();
});
});
it('should not remove data locations on replayed metadata PUT',
done => {
let serializedNewMd;

View File

@ -5,7 +5,7 @@ const https = require('https');
const querystring = require('querystring');
const conf = require('../../../../lib/Config').config;
const { GcpSigner } = require('../../../../lib/data/external/GCP');
const { GcpSigner } = require('arsenal').storage.data.external.GCP;
const transport = conf.https ? https : http;
const ipAddress = process.env.IP ? process.env.IP : '127.0.0.1';

View File

@ -132,7 +132,7 @@
"details": {
"gcpEndpoint": "storage.googleapis.com",
"bucketName": "zenko-gcp-bucket-2",
"mpuBucketName": "zenko-gcp-mpu",
"mpuBucketName": "zenko-gcp-mpu-2",
"overflowBucketName": "zenko-gcp-overflow",
"bucketMatch": true,
"credentialsProfile": "google_2",

View File

@ -1,6 +1,5 @@
'use strict'; // eslint-disable-line strict
const assert = require('assert');
const { errors } = require('arsenal');
const DummyRequestLogger = require('../unit/helpers').DummyRequestLogger;
const clientCheck
= require('../../lib/utilities/healthcheckHandler').clientCheck;
@ -22,7 +21,7 @@ describe('Healthcheck response', () => {
clientCheck(true, log, (err, results) => {
const resultKeys = Object.keys(results);
locConstraints.forEach(constraint => {
assert(resultKeys.includes(constraint));
assert(resultKeys.includes(constraint), `constraint: ${constraint} not in results: ${resultKeys}`);
});
done();
});
@ -71,8 +70,7 @@ describe('Healthcheck response', () => {
const azureLocationNonExistContainerError =
results[azureLocationNonExistContainer].error;
if (err) {
assert.strictEqual(err, errors.InternalError,
`got unexpected err in clientCheck: ${err}`);
assert(err.is.InternalError, `got unexpected err in clientCheck: ${err}`);
assert(azureLocationNonExistContainerError.startsWith(
'The specified container is being deleted.'));
return done();

View File

@ -11,7 +11,7 @@ const { cleanup, DummyRequestLogger, makeAuthInfo, versioningTestUtils } =
require('../unit/helpers');
const DummyRequest = require('../unit/DummyRequest');
const { config } = require('../../lib/Config');
const metadata = require('../../lib/metadata/in_memory/metadata').metadata;
const { metadata } = require('arsenal').storage.metadata.inMemory.metadata;
const { bucketPut } = require('../../lib/api/bucketPut');
const objectPut = require('../../lib/api/objectPut');
@ -46,7 +46,7 @@ const mpuBucket = `${constants.mpuBucketPrefix}${bucketName}`;
const awsBucket = config.locationConstraints[awsLocation].details.bucketName;
const smallBody = Buffer.from('I am a body', 'utf8');
const bigBody = Buffer.alloc(10485760);
const locMetaHeader = 'x-amz-meta-scal-location-constraint';
const locMetaHeader = 'scal-location-constraint';
const bucketPutRequest = {
bucketName,
namespace,
@ -233,7 +233,7 @@ function assertObjOnBackend(expectedBackend, objectKey, cb) {
return objectGet(authInfo, getObjectGetRequest(zenkoObjectKey), false, log,
(err, result, metaHeaders) => {
assert.equal(err, null, `Error getting object on S3: ${err}`);
assert.strictEqual(metaHeaders[locMetaHeader], expectedBackend);
assert.strictEqual(metaHeaders[`x-amz-meta-${locMetaHeader}`], expectedBackend);
if (expectedBackend === awsLocation) {
return s3.headObject({ Bucket: awsBucket, Key: objectKey },
(err, result) => {
@ -513,8 +513,7 @@ describe('Multipart Upload API with AWS Backend', function mpuTestSuite() {
const fakeKey = `key-${Date.now()}`;
const delParams = getDeleteParams(fakeKey, fakeUploadId);
multipartDelete(authInfo, delParams, log, err => {
assert.equal(err, errors.NoSuchUpload,
`Error aborting MPU: ${err}`);
assert(err.is.NoSuchUpload, `Error aborting MPU: ${err}`);
done();
});
});
@ -825,7 +824,7 @@ describe('Multipart Upload API with AWS Backend', function mpuTestSuite() {
(uploadId, next) => {
const listParams = getListParams(objectKey, uploadId);
listParts(authInfo, listParams, log, err => {
assert(err.NoSuchUpload);
assert(err.is.NoSuchUpload);
next();
});
},

View File

@ -4,7 +4,7 @@ const async = require('async');
const { bucketPut } = require('../../lib/api/bucketPut');
const objectPut = require('../../lib/api/objectPut');
const objectCopy = require('../../lib/api/objectCopy');
const { metadata } = require('../../lib/metadata/in_memory/metadata');
const { metadata } = require('arsenal').storage.metadata.inMemory.metadata;
const DummyRequest = require('../unit/DummyRequest');
const { cleanup, DummyRequestLogger, makeAuthInfo }
= require('../unit/helpers');

View File

@ -2,7 +2,7 @@ const assert = require('assert');
const { cleanup, DummyRequestLogger, makeAuthInfo }
= require('../unit/helpers');
const { ds } = require('../../lib/data/in_memory/backend');
const { ds } = require('arsenal').storage.data.inMemory.datastore;
const { bucketPut } = require('../../lib/api/bucketPut');
const objectPut = require('../../lib/api/objectPut');
const DummyRequest = require('../unit/DummyRequest');

View File

@ -5,14 +5,14 @@ const AWS = require('aws-sdk');
const { cleanup, DummyRequestLogger, makeAuthInfo }
= require('../unit/helpers');
const { ds } = require('../../lib/data/in_memory/backend');
const { ds } = require('arsenal').storage.data.inMemory.datastore;
const { bucketPut } = require('../../lib/api/bucketPut');
const initiateMultipartUpload
= require('../../lib/api/initiateMultipartUpload');
const objectPut = require('../../lib/api/objectPut');
const objectPutCopyPart = require('../../lib/api/objectPutCopyPart');
const DummyRequest = require('../unit/DummyRequest');
const { metadata } = require('../../lib/metadata/in_memory/metadata');
const { metadata } = require('arsenal').storage.metadata.inMemory.metadata;
const constants = require('../../constants');
const s3 = new AWS.S3();
@ -148,7 +148,7 @@ errorPutCopyPart) {
bucketName, sourceObjName, undefined, log, (err, copyResult) => {
if (errorPutCopyPart) {
assert.strictEqual(err.code, errorPutCopyPart.statusCode);
assert(err[errorPutCopyPart.code]);
assert(err.is[errorPutCopyPart.code]);
return cb();
}
assert.strictEqual(err, null);

View File

@ -6,13 +6,13 @@ const AWS = require('aws-sdk');
const { config } = require('../../lib/Config');
const { cleanup, DummyRequestLogger, makeAuthInfo }
= require('../unit/helpers');
const { ds } = require('../../lib/data/in_memory/backend');
const { ds } = require('arsenal').storage.data.inMemory.datastore;
const { bucketPut } = require('../../lib/api/bucketPut');
const initiateMultipartUpload
= require('../../lib/api/initiateMultipartUpload');
const objectPutPart = require('../../lib/api/objectPutPart');
const DummyRequest = require('../unit/DummyRequest');
const { metadata } = require('../../lib/metadata/in_memory/metadata');
const { metadata } = require('arsenal').storage.metadata.inMemory.metadata;
const mdWrapper = require('../../lib/metadata/wrapper');
const constants = require('../../constants');
const { getRealAwsConfig } =

View File

@ -3,15 +3,14 @@ const assert = require('assert');
const BucketInfo = require('arsenal').models.BucketInfo;
const getReplicationInfo =
require('../../../../lib/api/apiUtils/object/getReplicationInfo');
const { makeAuthInfo } = require('../../helpers');
function _getObjectReplicationInfo(replicationConfig, authInfo, isDeleteMarker) {
function _getObjectReplicationInfo(replicationConfig) {
const bucketInfo = new BucketInfo(
'testbucket', 'someCanonicalId', 'accountDisplayName',
new Date().toJSON(),
null, null, null, null, null, null, null, null, null,
replicationConfig);
return getReplicationInfo('fookey', bucketInfo, true, 123, null, null, authInfo, isDeleteMarker);
return getReplicationInfo('fookey', bucketInfo, true, 123, null, null);
}
describe('getReplicationInfo helper', () => {
@ -41,65 +40,6 @@ describe('getReplicationInfo helper', () => {
});
});
it('should get replication info when action comming from a non-lifecycle session', () => {
const replicationConfig = {
role: 'arn:aws:iam::root:role/s3-replication-role',
rules: [{
prefix: '',
enabled: true,
storageClass: 'awsbackend',
}],
destination: 'tosomewhere',
};
const authInfo = makeAuthInfo('accessKey1', null, 'another-session');
const replicationInfo = _getObjectReplicationInfo(replicationConfig, authInfo, true);
assert.deepStrictEqual(replicationInfo, {
status: 'PENDING',
backends: [{
site: 'awsbackend',
status: 'PENDING',
dataStoreVersionId: '',
}],
content: ['METADATA'],
destination: 'tosomewhere',
storageClass: 'awsbackend',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
});
});
it('should get replication info when action comming from a lifecycle session ' +
'but action is not delete marker', () => {
const replicationConfig = {
role: 'arn:aws:iam::root:role/s3-replication-role',
rules: [{
prefix: '',
enabled: true,
storageClass: 'awsbackend',
}],
destination: 'tosomewhere',
};
const authInfo = makeAuthInfo('accessKey1', null, 'backbeat-lifecycle');
const replicationInfo = _getObjectReplicationInfo(replicationConfig, authInfo, false);
assert.deepStrictEqual(replicationInfo, {
status: 'PENDING',
backends: [{
site: 'awsbackend',
status: 'PENDING',
dataStoreVersionId: '',
}],
content: ['METADATA'],
destination: 'tosomewhere',
storageClass: 'awsbackend',
role: 'arn:aws:iam::root:role/s3-replication-role',
storageType: 'aws_s3',
});
});
it('should not get replication info when rules are disabled', () => {
const replicationConfig = {
role: 'arn:aws:iam::root:role/s3-replication-role',
@ -113,21 +53,4 @@ describe('getReplicationInfo helper', () => {
const replicationInfo = _getObjectReplicationInfo(replicationConfig);
assert.deepStrictEqual(replicationInfo, undefined);
});
it('should not get replication info when action comming from lifecycle session', () => {
const replicationConfig = {
role: 'arn:aws:iam::root:role/s3-replication-role',
rules: [{
prefix: '',
enabled: true,
storageClass: 'awsbackend',
}],
destination: 'tosomewhere',
};
const authInfo = makeAuthInfo('accessKey1', null, 'backbeat-lifecycle');
const replicationInfo = _getObjectReplicationInfo(replicationConfig, authInfo, true);
assert.deepStrictEqual(replicationInfo, undefined);
});
});

View File

@ -38,16 +38,4 @@ describe('Check if location keys have changed between object locations', () => {
const curr = [{ key: 'ddd' }, { key: 'eee' }, { key: 'fff' }];
assert.strictEqual(locationKeysHaveChanged(prev, curr), true);
});
it('should return true if curr location is null', () => {
const prev = [{ key: 'ddd' }, { key: 'eee' }, { key: 'fff' }];
const curr = null;
assert.strictEqual(locationKeysHaveChanged(prev, curr), true);
});
it('should return true if both prev and curr locations are null', () => {
const prev = null;
const curr = null;
assert.strictEqual(locationKeysHaveChanged(prev, curr), true);
});
});

View File

@ -37,7 +37,7 @@ describe('objectLockHelpers: validateHeaders', () => {
= validateHeaders(objLockDisabledBucketInfo, headers, log);
const expectedError = errors.InvalidRequest.customizeDescription(
'Bucket is missing ObjectLockConfiguration');
assert.strictEqual(objectLockValidationError.InvalidRequest, true);
assert.strictEqual(objectLockValidationError.is.InvalidRequest, true);
assert.strictEqual(objectLockValidationError.description,
expectedError.description);
});
@ -90,7 +90,7 @@ describe('objectLockHelpers: validateHeaders', () => {
const expectedError = errors.InvalidArgument.customizeDescription(
'x-amz-object-lock-retain-until-date and x-amz-object-lock-mode ' +
'must both be supplied');
assert.strictEqual(objectLockValidationError.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.is.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.description,
expectedError.description);
});
@ -104,7 +104,7 @@ describe('objectLockHelpers: validateHeaders', () => {
const expectedError = errors.InvalidArgument.customizeDescription(
'x-amz-object-lock-retain-until-date and x-amz-object-lock-mode ' +
'must both be supplied');
assert.strictEqual(objectLockValidationError.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.is.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.description,
expectedError.description);
});
@ -118,7 +118,7 @@ describe('objectLockHelpers: validateHeaders', () => {
'The retain until date must be in the future!');
const objectLockValidationError
= validateHeaders(bucketInfo, headers, log);
assert.strictEqual(objectLockValidationError.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.is.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.description,
expectedError.description);
});
@ -131,7 +131,7 @@ describe('objectLockHelpers: validateHeaders', () => {
= validateHeaders(bucketInfo, headers, log);
const expectedError = errors.InvalidArgument.customizeDescription(
'Legal hold status must be one of "ON", "OFF"');
assert.strictEqual(objectLockValidationError.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.is.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.description,
expectedError.description);
});
@ -145,7 +145,7 @@ describe('objectLockHelpers: validateHeaders', () => {
= validateHeaders(bucketInfo, headers, log);
const expectedError = errors.InvalidArgument.customizeDescription(
'Unknown wormMode directive');
assert.strictEqual(objectLockValidationError.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.is.InvalidArgument, true);
assert.strictEqual(objectLockValidationError.description,
expectedError.description);
});

View File

@ -1,41 +0,0 @@
const assert = require('assert');
const { isLifecycleSession } =
require('../../../../lib/api/apiUtils/authorization/permissionChecks.js');
const tests = [
{
arn: 'arn:aws:sts::257038443293:assumed-role/rolename/backbeat-lifecycle',
description: 'a role assumed by lifecycle service',
expectedResult: true,
},
{
arn: undefined,
description: 'undefined',
expectedResult: false,
},
{
arn: '',
description: 'empty',
expectedResult: false,
},
{
arn: 'arn:aws:iam::257038443293:user/bart',
description: 'a user',
expectedResult: false,
},
{
arn: 'arn:aws:sts::257038443293:assumed-role/rolename/other-service',
description: 'a role assumed by another service',
expectedResult: false,
},
];
describe('authInfoHelper', () => {
tests.forEach(t => {
it(`should return ${t.expectedResult} if arn is ${t.description}`, () => {
const result = isLifecycleSession(t.arn);
assert.equal(result, t.expectedResult);
});
});
});

View File

@ -1,6 +1,6 @@
const assert = require('assert');
const { errors, versioning } = require('arsenal');
const { versioning } = require('arsenal');
const { config } = require('../../../../lib/Config');
const INF_VID = versioning.VersionID.getInfVid(config.replicationGroupId);
@ -415,7 +415,7 @@ describe('versioning helpers', () => {
versionId: 'v1',
},
reqVersionId: 'null',
expectedError: errors.NoSuchKey,
expectedError: 'NoSuchKey',
},
].forEach(testCase => it(testCase.description, done => {
const mockBucketMD = {
@ -425,7 +425,7 @@ describe('versioning helpers', () => {
'foobucket', mockBucketMD, testCase.objMD,
testCase.reqVersionId, null, (err, options) => {
if (testCase.expectedError) {
assert.strictEqual(err, testCase.expectedError);
assert.strictEqual(err.is[testCase.expectedError], true);
} else {
assert.ifError(err);
assert.deepStrictEqual(options, testCase.expectedRes);

View File

@ -10,7 +10,7 @@ const constants = require('../../../constants');
const initiateMultipartUpload
= require('../../../lib/api/initiateMultipartUpload');
const metadata = require('../metadataswitch');
const metadataMem = require('../../../lib/metadata/in_memory/metadata');
const metadataMem = require('arsenal').storage.metadata.inMemory.metadata;
const objectPut = require('../../../lib/api/objectPut');
const objectPutPart = require('../../../lib/api/objectPutPart');
const { cleanup, DummyRequestLogger, makeAuthInfo } = require('../helpers');
@ -146,7 +146,7 @@ describe('bucketDelete API', () => {
bucketPut(authInfo, testRequest, log, () => {
bucketDelete(authInfo, testRequest, log, () => {
metadata.getBucket(bucketName, log, (err, md) => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
assert.strictEqual(md, undefined);
metadata.listObject(usersBucket, { prefix: canonicalID },
log, (err, listResponse) => {

View File

@ -28,7 +28,7 @@ describe('getBucketLifecycle API', () => {
'bucket has no lifecycle', done => {
const lifecycleRequest = getLifecycleRequest(bucketName);
bucketGetLifecycle(authInfo, lifecycleRequest, log, err => {
assert.strictEqual(err.NoSuchLifecycleConfiguration, true);
assert.strictEqual(err.is.NoSuchLifecycleConfiguration, true);
done();
});
});

View File

@ -74,7 +74,7 @@ describe('bucketGetObjectLock API', () => {
'object lock is not enabled on the bucket', done => {
const objectLockRequest = getObjectLockConfigRequest(bucketName);
bucketGetObjectLock(authInfo, objectLockRequest, log, err => {
assert.strictEqual(err.ObjectLockConfigurationNotFoundError, true);
assert.strictEqual(err.is.ObjectLockConfigurationNotFoundError, true);
done();
});
});

View File

@ -44,7 +44,7 @@ describe('getBucketPolicy API', () => {
it('should return NoSuchBucketPolicy error if ' +
'bucket has no policy', done => {
bucketGetPolicy(authInfo, testBasicRequest, log, err => {
assert.strictEqual(err.NoSuchBucketPolicy, true);
assert.strictEqual(err.is.NoSuchBucketPolicy, true);
done();
});
});

View File

@ -83,8 +83,7 @@ describe('checkLocationConstraint function', () => {
if (testCheck.isError) {
assert.notEqual(checkLocation.error, null,
'Expected failure but got success');
assert.strictEqual(checkLocation.error.
InvalidLocationConstraint, true);
assert.strictEqual(checkLocation.error.is.InvalidLocationConstraint, true);
} else {
assert.ifError(checkLocation.error);
assert.strictEqual(checkLocation.locationConstraint,

View File

@ -25,7 +25,7 @@ describe('bucketPutEncryption API', () => {
describe('test invalid sse configs', () => {
it('should reject an empty config', done => {
bucketPutEncryption(authInfo, templateRequest(bucketName, { post: '' }), log, err => {
assert.strictEqual(err.MalformedXML, true);
assert.strictEqual(err.is.MalformedXML, true);
done();
});
});
@ -36,7 +36,7 @@ describe('bucketPutEncryption API', () => {
<ServerSideEncryptionConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
</ServerSideEncryptionConfiguration>`,
}), log, err => {
assert.strictEqual(err.MalformedXML, true);
assert.strictEqual(err.is.MalformedXML, true);
done();
});
});
@ -48,7 +48,7 @@ describe('bucketPutEncryption API', () => {
<Rule></Rule>
</ServerSideEncryptionConfiguration>`,
}), log, err => {
assert.strictEqual(err.MalformedXML, true);
assert.strictEqual(err.is.MalformedXML, true);
done();
});
});
@ -56,7 +56,7 @@ describe('bucketPutEncryption API', () => {
it('should reject a config with no SSEAlgorithm', done => {
const post = templateSSEConfig({});
bucketPutEncryption(authInfo, templateRequest(bucketName, { post }), log, err => {
assert.strictEqual(err.MalformedXML, true);
assert.strictEqual(err.is.MalformedXML, true);
done();
});
});
@ -64,7 +64,7 @@ describe('bucketPutEncryption API', () => {
it('should reject a config with an invalid SSEAlgorithm', done => {
const post = templateSSEConfig({ algorithm: 'InvalidAlgo' });
bucketPutEncryption(authInfo, templateRequest(bucketName, { post }), log, err => {
assert.strictEqual(err.MalformedXML, true);
assert.strictEqual(err.is.MalformedXML, true);
done();
});
});
@ -72,7 +72,7 @@ describe('bucketPutEncryption API', () => {
it('should reject a config with SSEAlgorithm == AES256 and a provided KMSMasterKeyID', done => {
const post = templateSSEConfig({ algorithm: 'AES256', keyId: '12345' });
bucketPutEncryption(authInfo, templateRequest(bucketName, { post }), log, err => {
assert.strictEqual(err.InvalidArgument, true);
assert.strictEqual(err.is.InvalidArgument, true);
done();
});
});

View File

@ -48,7 +48,7 @@ describe('putBucketObjectLock API', () => {
it('should return InvalidBucketState error', done => {
bucketPutObjectLock(authInfo, putObjLockRequest, log, err => {
assert.strictEqual(err.InvalidBucketState, true);
assert.strictEqual(err.is.InvalidBucketState, true);
done();
});
});

View File

@ -70,7 +70,7 @@ describe('putBucketPolicy API', () => {
expectedBucketPolicy.Statement[0].Resource = 'arn:aws::s3:::badname';
bucketPutPolicy(authInfo, getPolicyRequest(expectedBucketPolicy),
log, err => {
assert.strictEqual(err.MalformedPolicy, true);
assert.strictEqual(err.is.MalformedPolicy, true);
assert.strictEqual(err.description, 'Policy has invalid resource');
return done();
});
@ -81,7 +81,7 @@ describe('putBucketPolicy API', () => {
{ StringEquals: { 's3:x-amz-acl': ['public-read'] } };
bucketPutPolicy(authInfo, getPolicyRequest(expectedBucketPolicy), log,
err => {
assert.strictEqual(err.NotImplemented, true);
assert.strictEqual(err.is.NotImplemented, true);
done();
});
});
@ -90,7 +90,7 @@ describe('putBucketPolicy API', () => {
expectedBucketPolicy.Statement[0].Principal = { Service: ['test.com'] };
bucketPutPolicy(authInfo, getPolicyRequest(expectedBucketPolicy), log,
err => {
assert.strictEqual(err.NotImplemented, true);
assert.strictEqual(err.is.NotImplemented, true);
done();
});
});
@ -100,7 +100,7 @@ describe('putBucketPolicy API', () => {
{ Federated: 'www.test.com' };
bucketPutPolicy(authInfo, getPolicyRequest(expectedBucketPolicy), log,
err => {
assert.strictEqual(err.NotImplemented, true);
assert.strictEqual(err.is.NotImplemented, true);
done();
});
});

View File

@ -13,7 +13,7 @@ function checkError(xml, expectedErr, cb) {
if (expectedErr === null) {
assert.strictEqual(err, null, `expected no error but got '${err}'`);
} else {
assert(err[expectedErr], 'incorrect error response: should be ' +
assert(err.is[expectedErr], 'incorrect error response: should be ' +
`'Error: ${expectedErr}' but got '${err}'`);
}
return cb();

View File

@ -1,6 +1,5 @@
const crypto = require('crypto');
const assert = require('assert');
const { errors } = require('arsenal');
const BucketInfo = require('arsenal').models.BucketInfo;
const bucketGet = require('../../../lib/api/bucketGet');
@ -96,7 +95,7 @@ function confirmDeleted(done) {
process.nextTick(() => {
process.nextTick(() => {
metadata.getBucket(bucketName, log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
return checkBucketListing(authInfo, bucketName, 0, done);
});
});
@ -138,7 +137,7 @@ describe('deleted flag bucket handling', () => {
'different account sends put bucket request for bucket with ' +
'deleted flag', done => {
bucketPut(otherAccountAuthInfo, baseTestRequest, log, err => {
assert.deepStrictEqual(err, errors.BucketAlreadyExists);
assert.strictEqual(err.is.BucketAlreadyExists, true);
metadata.getBucket(bucketName, log, (err, data) => {
assert.strictEqual(data._transient, false);
assert.strictEqual(data._deleted, true);
@ -193,7 +192,7 @@ describe('deleted flag bucket handling', () => {
'x-amz-acl': 'public-read' }, 'headers',
baseTestRequest, baseTestRequest.headers);
bucketPutACL(otherAccountAuthInfo, putACLRequest, log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
metadata.getBucket(bucketName, log, (err, data) => {
assert.strictEqual(data._deleted, true);
assert.strictEqual(data._transient, false);
@ -212,7 +211,7 @@ describe('deleted flag bucket handling', () => {
baseTestRequest, baseTestRequest.headers);
const unauthorizedAccount = makeAuthInfo('keepMeOut');
bucketPutACL(unauthorizedAccount, putACLRequest, log, err => {
assert.deepStrictEqual(err, errors.AccessDenied);
assert.strictEqual(err.is.AccessDenied, true);
metadata.getBucket(bucketName, log, (err, data) => {
assert.strictEqual(data._deleted, true);
assert.strictEqual(data._transient, false);
@ -266,7 +265,7 @@ describe('deleted flag bucket handling', () => {
const postBody = Buffer.from('I am a body', 'utf8');
const putObjRequest = new DummyRequest(setUpRequest, postBody);
objectPut(otherAccountAuthInfo, putObjRequest, undefined, log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
done();
});
});
@ -314,7 +313,7 @@ describe('deleted flag bucket handling', () => {
initiateRequest.objectKey = 'objectName';
initiateMultipartUpload(otherAccountAuthInfo, initiateRequest, log,
err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
done();
});
});
@ -331,7 +330,7 @@ describe('deleted flag bucket handling', () => {
'authorized', done => {
bucketDelete(otherAccountAuthInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.AccessDenied);
assert.strictEqual(err.is.AccessDenied, true);
done();
});
});
@ -340,7 +339,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
bucketDeleteWebsite(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -349,7 +348,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
bucketGet(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -358,7 +357,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
bucketGetACL(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -367,7 +366,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
bucketGetCors(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -383,7 +382,7 @@ describe('deleted flag bucket handling', () => {
bucketPutCorsRequest.headers['content-md5'] = crypto.createHash('md5')
.update(bucketPutCorsRequest.post, 'utf8').digest('base64');
bucketPutCors(authInfo, bucketPutCorsRequest, log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -391,7 +390,7 @@ describe('deleted flag bucket handling', () => {
it('bucketDeleteCors request on bucket with delete flag should return ' +
'NoSuchBucket error and complete deletion', done => {
bucketDeleteCors(authInfo, baseTestRequest, log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -400,7 +399,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
bucketGetWebsite(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -414,7 +413,7 @@ describe('deleted flag bucket handling', () => {
'</WebsiteConfiguration>';
bucketPutWebsite(authInfo, bucketPutWebsiteRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -423,7 +422,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
bucketHead(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -438,13 +437,13 @@ describe('deleted flag bucket handling', () => {
if (extraArgNeeded) {
return apiAction(authInfo, mpuRequest, undefined,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchUpload);
assert.strictEqual(err.is.NoSuchUpload, true);
return done();
});
}
return apiAction(authInfo, mpuRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchUpload);
assert.strictEqual(err.is.NoSuchUpload, true);
return done();
});
}
@ -495,7 +494,7 @@ describe('deleted flag bucket handling', () => {
listRequest.query = {};
listMultipartUploads(authInfo, listRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
done();
});
});
@ -505,7 +504,7 @@ describe('deleted flag bucket handling', () => {
done => {
objectGet(authInfo, baseTestRequest, false,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -514,7 +513,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
objectGetACL(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -523,7 +522,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
objectHead(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -532,7 +531,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error and complete deletion', done => {
objectPutACL(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});
@ -541,7 +540,7 @@ describe('deleted flag bucket handling', () => {
'NoSuchBucket error', done => {
objectDelete(authInfo, baseTestRequest,
log, err => {
assert.deepStrictEqual(err, errors.NoSuchBucket);
assert.strictEqual(err.is.NoSuchBucket, true);
confirmDeleted(done);
});
});

View File

@ -5,8 +5,7 @@ const { parseString } = require('xml2js');
const BucketInfo = require('arsenal').models.BucketInfo;
const constants = require('../../../constants');
const { cleanup, DummyRequestLogger, makeAuthInfo } = require('../helpers');
const inMemMetadata
= require('../../../lib/metadata/in_memory/metadata').metadata;
const { metadata: inMemMetadata } = require('arsenal').storage.metadata.inMemory.metadata;
const listParts = require('../../../lib/api/listParts');
const metadata = require('../metadataswitch');

Some files were not shown because too many files have changed in this diff Show More