Compare commits

...

20 Commits

Author SHA1 Message Date
Lauren Spiegel c3e7ff96a8 change bucket name 2017-02-13 15:40:39 -08:00
Dora Korpar 5f4a9a27ce Fix get in file and mem backends 2017-02-13 11:25:22 -08:00
Dora Korpar dc2a5fe132 Delete works 2017-02-01 13:32:38 -08:00
Dora Korpar fda82c81c9 Get works and delete almost fully functional: 2017-01-31 18:00:28 -08:00
Dora Korpar f80cbbd2e9 Implement delete and start get 2017-01-27 17:35:30 -08:00
Lauren Spiegel 572a236a5d it works!! 2017-01-19 16:10:26 -08:00
Lauren Spiegel a1583a60db TO SQUASH: further on wrapper 2017-01-19 12:55:12 -08:00
Lauren Spiegel d8fc83a6d9 add mpu info to aws keys 2017-01-19 12:55:12 -08:00
Lauren Spiegel 8ccdb3f829 work on gateway and send objectKey in key context 2017-01-19 12:55:12 -08:00
Lauren Spiegel 6ad9c89cce handle multiple backend put separately in outer wrapper 2017-01-19 12:55:12 -08:00
Lauren Spiegel 7a0195f9ee remove scality specific gateway 2017-01-19 12:55:12 -08:00
Lauren Spiegel 069326b7dc validate backend info 2017-01-19 12:55:12 -08:00
Lauren Spiegel fcfcab1bac get backendInfo from remaining api's 2017-01-19 12:55:12 -08:00
Lauren Spiegel dd580f2b16 send locationConstraint/backendInfo to wrapper 2017-01-19 12:55:12 -08:00
Lauren Spiegel c75f17ff04 TO SQUASH: multiple for all 2017-01-19 12:55:12 -08:00
Lauren Spiegel b91575f9bd FT: Enable multiple data backends 2017-01-19 12:55:12 -08:00
Dora Korpar 31e0cde919 Update unit tests 2017-01-19 12:06:06 -08:00
Dora Korpar b382e96cc2 Change utils functions for new config 2017-01-18 16:30:08 -08:00
Dora Korpar 74a8e7cd9c Fix sproxyd assignment 2017-01-18 14:56:30 -08:00
Dora Korpar 94f97a6039 Set up config for multiple backends 2017-01-18 12:23:16 -08:00
18 changed files with 792 additions and 181 deletions

View File

@ -1,24 +1,67 @@
{ {
"port": 8000, "port": 8000,
"listenOn": [], "listenOn": [],
"regions": { "locationConstraints": {
"ap-northeast-1": ["s3.ap-northeast-1.amazonaws.com"], "aws-us-east-1": {
"ap-southeast-1": ["s3.ap-southeast-1.amazonaws.com"], "type": "aws_s3",
"ap-southeast-2": ["s3.ap-southeast-2.amazonaws.com"], "information": {
"eu-central-1": ["s3.eu-central-1.amazonaws.com", "region": "us-east-1",
"s3.eu.central-1.amazonaws.com"], "bucketName": "premadebucket",
"eu-west-1": ["s3.eu-west-1.amazonaws.com"], "credentialsProfile": "default"
"sa-east-1": ["s3.sa-east-1.amazonaws.com"], }
"us-east-1": ["s3.amazonaws.com", },
"s3-external-1.amazonaws.com", "aws-us-east-test": {
"s3.us-east-1.amazonaws.com"], "type": "aws_s3",
"us-west-1": ["s3.us-west-1.amazonaws.com"], "information": {
"us-west-2": ["s3-us-west-2.amazonaws.com"], "region": "us-east-1",
"us-gov-west-1": ["s3-us-gov-west-1.amazonaws.com", "endpoint": "s3.amazonaws.com",
"s3-fips-us-gov-west-1.amazonaws.com"], "bucketName": "multitester444",
"localregion": ["localhost"], "credentialsProfile": "default"
"test-region": ["s3.scality.test"], }
"docker-region": ["s3.docker.test"] },
"scality-us-east-1": {
"type": "scality_s3",
"information": {
"region": "us-east-1",
"connector": {
"sproxyd": {
"bootstrap": ["localhost:8181"]
}
}
}
},
"scality-us-west-1": {
"type": "scality_s3",
"information": {
"region": "us-west-1",
"connector": {
"sproxyd": {
"bootstrap": ["localhost:8182"]
}
}
}
},
"virtual-user-metadata": {
"type": "scality_user_metadata",
"information": {
}
},
"file": {
"type": "file",
"information": {
}
},
"mem": {
"type": "mem",
"information": {
}
}
},
"restEndpoints": {
"127.0.0.1": "scality-us-east-1",
"s3.docker.test": "scality-us-west-1",
"127.0.0.2": "aws-us-east-1",
"s3.amazonaws.com": "aws-us-east-1"
}, },
"websiteEndpoints": ["s3-website-us-east-1.amazonaws.com", "websiteEndpoints": ["s3-website-us-east-1.amazonaws.com",
"s3-website.us-east-2.amazonaws.com", "s3-website.us-east-2.amazonaws.com",
@ -34,9 +77,6 @@
"s3-website-sa-east-1.amazonaws.com", "s3-website-sa-east-1.amazonaws.com",
"s3-website.localhost", "s3-website.localhost",
"s3-website.scality.test"], "s3-website.scality.test"],
"sproxyd": {
"bootstrap": ["localhost:8181"]
},
"bucketd": { "bucketd": {
"bootstrap": ["localhost"] "bootstrap": ["localhost"]
}, },

View File

@ -66,4 +66,7 @@ export default {
// hex digest of sha256 hash of empty string: // hex digest of sha256 hash of empty string:
emptyStringHash: crypto.createHash('sha256') emptyStringHash: crypto.createHash('sha256')
.update('', 'binary').digest('hex'), .update('', 'binary').digest('hex'),
// user metadata header to set object locationConstraint
objectLocationConstraintHeader: 'x-amz-meta-scal-location-constraint',
}; };

View File

@ -40,7 +40,8 @@ function _setDirSyncFlag(path) {
fs.closeSync(pathFD2); fs.closeSync(pathFD2);
} }
if (config.backends.data !== 'file' && config.backends.metadata !== 'file') { if (config.backends.data !== 'file' && config.backends.data !== 'multiple'
|| config.backends.metadata === 'scality') {
logger.info('No init required. Go forth and store data.'); logger.info('No init required. Go forth and store data.');
process.exit(0); process.exit(0);
} }

View File

@ -9,6 +9,28 @@ import authDataChecker from './auth/in_memory/checker';
const defaultHealthChecks = { allowFrom: ['127.0.0.1/8', '::1'] }; const defaultHealthChecks = { allowFrom: ['127.0.0.1/8', '::1'] };
const defaultLocalCache = { host: '127.0.0.1', port: 6379 }; const defaultLocalCache = { host: '127.0.0.1', port: 6379 };
function sproxydAssert(configSproxyd, cb) {
if (configSproxyd.bootstrap !== undefined) {
assert(Array.isArray(configSproxyd.bootstrap)
&& configSproxyd.bootstrap
.every(e => typeof e === 'string'),
'bad config: sproxyd.bootstrap must be a list of strings');
assert(configSproxyd.bootstrap.length > 0,
'sproxyd bootstrap list is empty');
// thisSproxyd.bootstrap = configSproxyd.bootstrap;
cb('bootstrap');
}
if (configSproxyd.chordCos !== undefined) {
assert(typeof configSproxyd.chordCos === 'string',
'bad config: sproxyd.chordCos must be a string');
assert(configSproxyd.chordCos.match(/^[0-9a-fA-F]{2}$/),
'bad config: sproxyd.chordCos must be a 2hex-chars string');
// thisSproxyd.chordCos =
// Number.parseInt(configSproxyd.chordCos, 16);
cb('chordCos');
}
}
/** /**
* Reads from a config file and returns the content as a config object * Reads from a config file and returns the content as a config object
*/ */
@ -67,13 +89,75 @@ class Config {
}); });
} }
assert(typeof config.regions === 'object', // legacy
'bad config: the list of regions is mandatory'); if (config.regions !== undefined) {
assert(Object.keys(config.regions).every( assert(typeof config.regions === 'object',
r => typeof r === 'string' && config.regions[r] instanceof Array 'bad config: the list of regions is mandatory');
&& config.regions[r].every(e => typeof e === 'string')), assert(Object.keys(config.regions).every(
'bad config: regions must be a set of {region: [endpoints]}'); r => typeof r === 'string' && config.regions[r] instanceof Array
this.regions = config.regions; && config.regions[r].every(e => typeof e === 'string')),
'bad config: regions must be a set of {region: [endpoints]}');
this.regions = config.regions;
}
this.locationConstraints = {};
if (config.locationConstraints !== undefined) {
assert(typeof config.locationConstraints === 'object',
'bad config: locationConstraints must be an object');
Object.keys(config.locationConstraints).forEach(l => {
assert(typeof config.locationConstraints[l].type === 'string' &&
typeof config.locationConstraints[l].information
=== 'object',
'bad config: locationConstraints.type and ' +
'locationConstraints.information are mandatory- type ' +
'must be a string and information must be an object');
this.locationConstraints[l] = config.locationConstraints[l];
this.locationConstraints[l].type =
config.locationConstraints[l].type;
const info = config.locationConstraints[l].information;
const stringFields = [
'region',
'bucketName',
'credentialsProfile',
];
stringFields.forEach(field => {
if (info.field !== undefined) {
assert(typeof info.field === 'string',
`bad config: ${field} must be a string`);
this.locationConstraints[l].information.field =
info.field;
}
});
if (info.connector !== undefined) {
assert(typeof info.connector === 'object',
'bad config: connector must be an object');
if (info.connector.sproxyd !== undefined) {
sproxydAssert(info.connector.sproxyd, field => {
if (field === 'chordCos') {
this.locationConstraints[l].information
.connector.sproxyd[field] =
Number.parseInt(info.connector.
sproxyd[field], 16);
} else {
this.locationConstraints[l].information
.connector.sproxyd[field] =
info.connector.sproxyd[field];
}
});
}
}
});
}
this.restEndpoints = {};
if (config.restEndpoints !== undefined) {
assert(typeof config.restEndpoints === 'object',
'bad config: restEndpoints must be an object of endpoints');
assert(Object.keys(config.restEndpoints).every(
r => typeof config.restEndpoints[r] === 'string'),
'bad config: each endpoint must be a string');
this.restEndpoints = config.restEndpoints;
}
this.websiteEndpoints = []; this.websiteEndpoints = [];
if (config.websiteEndpoints !== undefined) { if (config.websiteEndpoints !== undefined) {
@ -95,25 +179,17 @@ class Config {
assert(typeof config.usEastBehavior === 'boolean'); assert(typeof config.usEastBehavior === 'boolean');
this.usEastBehavior = config.usEastBehavior; this.usEastBehavior = config.usEastBehavior;
} }
// legacy
this.sproxyd = { bootstrap: [] }; this.sproxyd = { bootstrap: [] };
if (config.sproxyd !== undefined) { if (config.sproxyd !== undefined) {
if (config.sproxyd.bootstrap !== undefined) { sproxydAssert(config.sproxyd, field => {
assert(Array.isArray(config.sproxyd.bootstrap) if (field === 'chordCos') {
&& config.sproxyd.bootstrap this.sproxyd[field] =
.every(e => typeof e === 'string'), Number.parseInt(config.sproxyd[field], 16);
'bad config: sproxyd.bootstrap must be a list of strings'); } else {
assert(config.sproxyd.bootstrap.length > 0, this.sproxyd[field] = config.sproxyd[field];
'sproxyd bootstrap list is empty'); }
this.sproxyd.bootstrap = config.sproxyd.bootstrap; });
}
if (config.sproxyd.chordCos !== undefined) {
assert(typeof config.sproxyd.chordCos === 'string',
'bad config: sproxyd.chordCos must be a string');
assert(config.sproxyd.chordCos.match(/^[0-9a-fA-F]{2}$/),
'bad config: sproxyd.chordCos must be a 2hex-chars string');
this.sproxyd.chordCos =
Number.parseInt(config.sproxyd.chordCos, 16);
}
} }
this.bucketd = { bootstrap: [] }; this.bucketd = { bootstrap: [] };
@ -329,8 +405,19 @@ class Config {
} }
this.authData = authData; this.authData = authData;
} }
if (process.env.S3SPROXYD) { if (process.env.S3DATA) {
data = process.env.S3SPROXYD; const validData = ['mem', 'file', 'scality', 'multiple'];
assert(validData.indexOf(process.env.S3DATA) > -1,
'bad environment variable: S3DATA environment variable ' +
'should be one of mem/file/scality/multiple'
);
if (process.env.S3DATA === 'multiple') {
assert(config.locationConstraints !== undefined,
'for multiple data backends, locationConstraints ' +
'must be set'
);
}
data = process.env.S3DATA;
} }
if (process.env.S3METADATA) { if (process.env.S3METADATA) {
metadata = process.env.S3METADATA; metadata = process.env.S3METADATA;

View File

@ -0,0 +1,136 @@
import config from '../../../Config';
export class BackendInfo {
/**
* Represents the info necessary to evaluate which data backend to use
* on a data put call.
* @constructor
* @param {string | undefined} objectLocationConstraint - location constraint
* for object based on user meta header
* @param {string | undefined } bucketLocationConstraint - location
* constraint for bucket based on bucket metadata
* @param {string} requestEndpoint - endpoint to which request was made
*/
constructor(objectLocationConstraint, bucketLocationConstraint,
requestEndpoint) {
this._objectLocationConstraint = objectLocationConstraint;
this._bucketLocationConstraint = bucketLocationConstraint;
this._requestEndpoint = requestEndpoint;
return this;
}
/**
* validate proposed location constraint against config
* @param {string | undefined} locationConstraint - value of user
* metadata location constraint header or bucket location constraint
* @param {object} log - werelogs logger
* @return {boolean} - true if valid, false if not
*/
static isValidLocationConstraint(locationConstraint, log) {
if (Object.keys(config.locationConstraints)
.indexOf(locationConstraint) < 0) {
log.trace('proposed locationConstraint is invalid',
{ locationConstraint });
return false;
}
return true;
}
/**
* validate requestEndpoint against config
* if there is a mismatch between the request endpoint and what is in
* the config, this could cause a problem for setting the backend location
* for storing data
* @param {string} requestEndpoint - request endpoint
* @param {object} log - werelogs logger
* @return {boolean} - true if valid, false if not
*/
static isValidRequestEndpoint(requestEndpoint, log) {
if (Object.keys(config.restEndpoints)
.indexOf(requestEndpoint) < 0) {
log.trace('requestEndpoint does not match config restEndpoints',
{ requestEndpoint });
return false;
}
if (Object.keys(config.locationConstraints).indexOf(config
.restEndpoints[requestEndpoint]) < 0) {
log.trace('the default locationConstraint for requestEndpoint ' +
'does not match any config locationConstraint',
{ requestEndpoint });
return false;
}
return true;
}
/**
* validate proposed BackendInfo parameters
* @param {string | undefined} objectLocationConstraint - value of user
* metadata location constraint header
* @param {string | null} bucketLocationConstraint - location
* constraint from bucket metadata
* @param {string} requestEndpoint - endpoint of request
* @param {object} log - werelogs logger
* @return {boolean} - true if valid, false if not
*/
static areValidBackendParameters(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint, log) {
if (objectLocationConstraint &&
!BackendInfo.isValidLocationConstraint(objectLocationConstraint,
log)) {
log.trace('objectLocationConstraint is invalid');
return false;
}
if (bucketLocationConstraint &&
!BackendInfo.isValidLocationConstraint(bucketLocationConstraint,
log)) {
log.trace('bucketLocationConstraint is invalid');
return false;
}
if (!BackendInfo.isValidRequestEndpoint(requestEndpoint, log)) {
return false;
}
return true;
}
/**
* Return objectLocationConstraint
* @return {string | undefined} objectLocationConstraint;
*/
getObjectLocationConstraint() {
return this._objectLocationConstraint;
}
/**
* Return bucketLocationConstraint
* @return {string | undefined} bucketLocationConstraint;
*/
getBucketLocationConstraint() {
return this._bucketLocationConstraint;
}
/**
* Return requestEndpoint
* @return {string} requestEndpoint;
*/
getRequestEndpoint() {
return this._requestEndpoint;
}
/**
* Return locationConstraint that should be used with put request
* Order of priority is:
* (1) objectLocationConstraint,
* (2) bucketLocationConstraint,
* (3) default locationConstraint for requestEndpoint
* @return {string} locationConstraint;
*/
getControllingLocationConstraint() {
if (this._objectLocationConstraint) {
return this.getObjectLocationConstraint();
}
if (this._bucketLocationConstraint) {
return this.getBucketLocationConstraint();
}
return config.restEndpoints[this.getRequestEndpoint];
}
}

View File

@ -55,7 +55,7 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
} }
/** /**
* Stores object and responds back with location and storage type * Stores object and responds back with key and storage type
* @param {object} objectContext - object's keyContext for sproxyd Key * @param {object} objectContext - object's keyContext for sproxyd Key
* computation (put API) * computation (put API)
* @param {object} cipherBundle - cipher bundle that encrypt the data * @param {object} cipherBundle - cipher bundle that encrypt the data
@ -64,14 +64,16 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
* @param {object | null } streamingV4Params - if v4 auth, object containing * @param {object | null } streamingV4Params - if v4 auth, object containing
* accessKey, signatureFromRequest, region, scopeDate, timestamp, and * accessKey, signatureFromRequest, region, scopeDate, timestamp, and
* credentialScope (to be used for streaming v4 auth if applicable) * credentialScope (to be used for streaming v4 auth if applicable)
* @param {BackendInfo} backendInfo - info to determine which data
* backend to use
* @param {RequestLogger} log - the current stream logger * @param {RequestLogger} log - the current stream logger
* @param {function} cb - callback containing result for the next task * @param {function} cb - callback containing result for the next task
* @return {undefined} * @return {undefined}
*/ */
export function dataStore(objectContext, cipherBundle, stream, size, export function dataStore(objectContext, cipherBundle, stream, size,
streamingV4Params, log, cb) { streamingV4Params, backendInfo, log, cb) {
const dataStream = prepareStream(stream, streamingV4Params, log, cb); const dataStream = prepareStream(stream, streamingV4Params, log, cb);
data.put(cipherBundle, dataStream, size, objectContext, log, data.put(cipherBundle, dataStream, size, objectContext, backendInfo, log,
(err, dataRetrievalInfo, hashedStream) => { (err, dataRetrievalInfo, hashedStream) => {
if (err) { if (err) {
log.error('error in datastore', { log.error('error in datastore', {

View File

@ -1,6 +1,8 @@
import async from 'async'; import async from 'async';
import { errors } from 'arsenal'; import { errors } from 'arsenal';
import { BackendInfo } from './apiUtils/object/BackendInfo';
import constants from '../../constants';
import data from '../data/wrapper'; import data from '../data/wrapper';
import kms from '../kms/wrapper'; import kms from '../kms/wrapper';
import { logger } from '../utilities/logger'; import { logger } from '../utilities/logger';
@ -128,6 +130,7 @@ function objectCopy(authInfo, request, sourceBucket,
bucketName: destBucketName, bucketName: destBucketName,
owner: authInfo.getCanonicalID(), owner: authInfo.getCanonicalID(),
namespace: request.namespace, namespace: request.namespace,
objectKey: destObjectKey,
}; };
const websiteRedirectHeader = const websiteRedirectHeader =
request.headers['x-amz-website-redirect-location']; request.headers['x-amz-website-redirect-location'];
@ -216,6 +219,22 @@ function objectCopy(authInfo, request, sourceBucket,
return next(null, storeMetadataParams, dataLocator, destObjMD, return next(null, storeMetadataParams, dataLocator, destObjMD,
serverSideEncryption); serverSideEncryption);
} }
const objectLocationConstraint = storeMetadataParams
.metaHeaders[constants.objectLocationConstraintHeader];
const bucketLocationConstraint = destBucketMD
.getLocationConstraint();
const requestEndpoint = request.parsedHost;
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint)) {
return process.nextTick(() => {
next(errors.InvalidArgument
.customizeDescription('Location Constraint Info is ' +
'invalid.'));
});
}
const backendInfo = new BackendInfo(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint);
// dataLocator is an array. need to get and put all parts // dataLocator is an array. need to get and put all parts
// For now, copy 1 part at a time. Could increase the second // For now, copy 1 part at a time. Could increase the second
// argument here to increase the number of parts // argument here to increase the number of parts
@ -236,7 +255,8 @@ function objectCopy(authInfo, request, sourceBucket,
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return data.put(cipherBundle, stream, return data.put(cipherBundle, stream,
part.size, dataStoreContext, log, part.size, dataStoreContext,
backendInfo, log,
(error, partRetrievalInfo) => { (error, partRetrievalInfo) => {
if (error) { if (error) {
return cb(error); return cb(error);
@ -258,8 +278,10 @@ function objectCopy(authInfo, request, sourceBucket,
} }
// Copied object is not encrypted so just put it // Copied object is not encrypted so just put it
// without a cipherBundle // without a cipherBundle
return data.put(null, stream, part.size, return data.put(null, stream, part.size,
dataStoreContext, log, (error, partRetrievalInfo) => { dataStoreContext, backendInfo,
log, (error, partRetrievalInfo) => {
if (error) { if (error) {
return cb(error); return cb(error);
} }

View File

@ -1,5 +1,7 @@
import { errors } from 'arsenal'; import { errors } from 'arsenal';
import { BackendInfo } from './apiUtils/object/BackendInfo';
import constants from '../../constants';
import services from '../services'; import services from '../services';
import validateHeaders from '../utilities/validateHeaders'; import validateHeaders from '../utilities/validateHeaders';
import { pushMetric } from '../utapi/utilities'; import { pushMetric } from '../utapi/utilities';
@ -51,8 +53,21 @@ export default function objectDelete(authInfo, request, log, cb) {
contentLength: objMD['content-length'], contentLength: objMD['content-length'],
}); });
} }
return services.deleteObject(bucketName, objMD, objectKey, log, const objectLocationConstraint = request
err => { .headers[constants.objectLocationConstraintHeader];
const bucketLocationConstraint = bucket.getLocationConstraint();
const requestEndpoint = request.parsedHost;
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint)) {
return process.nextTick(() => {
cb(errors.InvalidArgument
.customizeDescription('Location Constraint Info is ' +
'invalid.'));
});
}
return services.deleteObject(bucketName, objMD, objectKey,
log, err => {
if (err) { if (err) {
return cb(err); return cb(err);
} }

View File

@ -1,5 +1,6 @@
import { errors } from 'arsenal'; import { errors } from 'arsenal';
import { BackendInfo } from './apiUtils/object/BackendInfo';
import data from '../data/wrapper'; import data from '../data/wrapper';
import services from '../services'; import services from '../services';
import aclUtils from '../utilities/aclUtils'; import aclUtils from '../utilities/aclUtils';
@ -26,7 +27,7 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
}); });
} }
function _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID, function _storeIt(bucket, objectKey, objMD, authInfo, canonicalID,
cipherBundle, request, streamingV4Params, log, callback) { cipherBundle, request, streamingV4Params, log, callback) {
const size = request.parsedContentLength; const size = request.parsedContentLength;
@ -41,10 +42,12 @@ function _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID,
const metaHeaders = utils.getMetaHeaders(request.headers); const metaHeaders = utils.getMetaHeaders(request.headers);
log.trace('meta headers', { metaHeaders, method: 'objectPut' }); log.trace('meta headers', { metaHeaders, method: 'objectPut' });
const bucketName = bucket.getName();
const objectKeyContext = { const objectKeyContext = {
bucketName, bucketName,
owner: canonicalID, owner: canonicalID,
namespace: request.namespace, namespace: request.namespace,
objectKey,
}; };
// If the request was made with a pre-signed url, the x-amz-acl 'header' // If the request was made with a pre-signed url, the x-amz-acl 'header'
// might be in the query string rather than the actual headers so include // might be in the query string rather than the actual headers so include
@ -81,8 +84,23 @@ function _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID,
log.trace('storing object in data', { log.trace('storing object in data', {
method: 'services.metadataValidateAuthorization', method: 'services.metadataValidateAuthorization',
}); });
const objectLocationConstraint = request
.headers[constants.objectLocationConstraintHeader];
const bucketLocationConstraint = bucket.getLocationConstraint();
const requestEndpoint = request.parsedHost;
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint)) {
return process.nextTick(() => {
callback(errors.InvalidArgument
.customizeDescription('Location Constraint Info is ' +
'invalid.'));
});
}
const backendInfo = new BackendInfo(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint);
return dataStore(objectKeyContext, cipherBundle, request, size, return dataStore(objectKeyContext, cipherBundle, request, size,
streamingV4Params, log, (err, dataGetInfo, calculatedHash) => { streamingV4Params, backendInfo, log,
(err, dataGetInfo, calculatedHash) => {
if (err) { if (err) {
log.trace('error from data', { log.trace('error from data', {
error: err, error: err,
@ -225,13 +243,13 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
if (err) { if (err) {
return callback(errors.InternalError); return callback(errors.InternalError);
} }
return _storeIt(bucketName, objectKey, return _storeIt(bucket, objectKey,
objMD, authInfo, canonicalID, objMD, authInfo, canonicalID,
cipherBundle, request, cipherBundle, request,
streamingV4Params, log, callback); streamingV4Params, log, callback);
}); });
} }
return _storeIt(bucketName, objectKey, objMD, return _storeIt(bucket, objectKey, objMD,
authInfo, canonicalID, null, request, authInfo, canonicalID, null, request,
streamingV4Params, log, callback); streamingV4Params, log, callback);
}); });
@ -243,12 +261,12 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
if (err) { if (err) {
return callback(errors.InternalError); return callback(errors.InternalError);
} }
return _storeIt(bucketName, objectKey, objMD, return _storeIt(bucket, objectKey, objMD,
authInfo, canonicalID, cipherBundle, authInfo, canonicalID, cipherBundle,
request, streamingV4Params, log, callback); request, streamingV4Params, log, callback);
}); });
} }
return _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID, return _storeIt(bucket, objectKey, objMD, authInfo, canonicalID,
null, request, streamingV4Params, log, callback); null, request, streamingV4Params, log, callback);
}); });
} }

View File

@ -1,6 +1,7 @@
import async from 'async'; import async from 'async';
import { errors } from 'arsenal'; import { errors } from 'arsenal';
import { BackendInfo } from './apiUtils/object/BackendInfo';
import constants from '../../constants'; import constants from '../../constants';
import data from '../data/wrapper'; import data from '../data/wrapper';
import kms from '../kms/wrapper'; import kms from '../kms/wrapper';
@ -30,6 +31,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
log.debug('processing request', { method: 'objectPutCopyPart' }); log.debug('processing request', { method: 'objectPutCopyPart' });
const destBucketName = request.bucketName; const destBucketName = request.bucketName;
const destObjectKey = request.objectKey; const destObjectKey = request.objectKey;
const mpuBucketName = `${constants.mpuBucketPrefix}${destBucketName}`;
const valGetParams = { const valGetParams = {
authInfo, authInfo,
bucketName: sourceBucket, bucketName: sourceBucket,
@ -74,6 +76,9 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
bucketName: destBucketName, bucketName: destBucketName,
owner: authInfo.getCanonicalID(), owner: authInfo.getCanonicalID(),
namespace: request.namespace, namespace: request.namespace,
objectKey: destObjectKey,
partNumber: paddedPartNumber,
uploadId,
}; };
return async.waterfall([ return async.waterfall([
@ -123,29 +128,83 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
copyObjectSize); copyObjectSize);
}); });
}, },
function checkMPUBucketAuth(dataLocator, destBucketMD, // get MPU shadow bucket to get splitter based on MD version
function getMpuShadowBucket(dataLocator, destBucketMD,
copyObjectSize, next) { copyObjectSize, next) {
return services.metadataValidateMultipart(valMPUParams, return metadata.getBucket(mpuBucketName, log,
(err, mpuBucket) => { (err, mpuBucket) => {
if (err && err.NoSuchBucket) {
return next(errors.NoSuchUpload);
}
if (err) { if (err) {
log.trace('error authorizing based on mpu bucket', log.error('error getting the shadow mpu bucket', {
{ error: err }); error: err,
method: 'objectPutCopyPart::metadata.getBucket',
});
return next(err); return next(err);
} }
return next(null, dataLocator, let splitter = constants.splitter;
destBucketMD, mpuBucket, copyObjectSize); if (mpuBucket.getMdBucketModelVersion() < 2) {
splitter = constants.oldSplitter;
}
return next(null, dataLocator, destBucketMD,
copyObjectSize, splitter);
}); });
}, },
// Get MPU overview object to check authorization to put a part
// and to get any object location constraint info
function getMpuOverviewObject(dataLocator, destBucketMD,
copyObjectSize, splitter, next) {
const mpuOverviewKey =
`overview${splitter}${destObjectKey}${splitter}${uploadId}`;
return metadata.getObjectMD(mpuBucketName, mpuOverviewKey, log,
(err, res) => {
if (err) {
log.error('error getting overview object from ' +
'mpu bucket', {
error: err,
method: 'objectPutCopyPart::' +
'metadata.getObjectMD',
});
return next(err);
}
const initiatorID = res.initiator.ID;
const requesterID = authInfo.isRequesterAnIAMUser() ?
authInfo.getArn() : authInfo.getCanonicalID();
if (initiatorID !== requesterID) {
return next(errors.AccessDenied);
}
const objectLocationConstraint =
res[constants.objectLocationConstraintHeader];
return next(null, dataLocator, destBucketMD,
objectLocationConstraint, copyObjectSize, next);
});
},
function goGetData(dataLocator, destBucketMD, function goGetData(dataLocator, destBucketMD,
mpuBucket, copyObjectSize, next) { objectLocationConstraint, copyObjectSize, next) {
const serverSideEncryption = destBucketMD.getServerSideEncryption(); const serverSideEncryption = destBucketMD.getServerSideEncryption();
// skip if 0 byte object // skip if 0 byte object
if (dataLocator.length === 0) { if (dataLocator.length === 0) {
return next(null, [], constants.emptyFileMd5, return process.nextTick(() => {
copyObjectSize, mpuBucket, next(null, [], constants.emptyFileMd5,
serverSideEncryption); copyObjectSize, serverSideEncryption);
});
} }
const bucketLocationConstraint = destBucketMD
.getLocationConstraint();
const requestEndpoint = request.parsedHost;
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint)) {
return process.nextTick(() => {
next(errors.InvalidArgument
.customizeDescription('Location Constraint Info is ' +
'invalid.'));
});
}
const backendInfo = new BackendInfo(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint);
// totalHash will be sent through the RelayMD5Sum transform streams // totalHash will be sent through the RelayMD5Sum transform streams
// to collect the md5 from multiple streams // to collect the md5 from multiple streams
let totalHash; let totalHash;
@ -178,7 +237,8 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return data.put(cipherBundle, hashedStream, return data.put(cipherBundle, hashedStream,
numberPartSize, dataStoreContext, log, numberPartSize, dataStoreContext,
backendInfo, log,
(error, partRetrievalInfo) => { (error, partRetrievalInfo) => {
if (error) { if (error) {
log.debug('error putting ' + log.debug('error putting ' +
@ -210,7 +270,8 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
// Copied object is not encrypted so just put it // Copied object is not encrypted so just put it
// without a cipherBundle // without a cipherBundle
return data.put(null, hashedStream, numberPartSize, return data.put(null, hashedStream, numberPartSize,
dataStoreContext, log, (error, partRetrievalInfo) => { dataStoreContext, backendInfo,
log, (error, partRetrievalInfo) => {
if (error) { if (error) {
log.debug('error putting object part', log.debug('error putting object part',
{ error }); { error });
@ -234,15 +295,14 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
// Digest the final combination of all of the part streams // Digest the final combination of all of the part streams
totalHash = totalHash.digest('hex'); totalHash = totalHash.digest('hex');
return next(null, locations, totalHash, return next(null, locations, totalHash,
copyObjectSize, mpuBucket, copyObjectSize, serverSideEncryption);
serverSideEncryption);
}); });
}, },
function getExistingPartInfo(locations, totalHash, function getExistingPartInfo(locations, totalHash,
copyObjectSize, mpuBucket, serverSideEncryption, next) { copyObjectSize, serverSideEncryption, next) {
const partKey = const partKey =
`${uploadId}${constants.splitter}${paddedPartNumber}`; `${uploadId}${constants.splitter}${paddedPartNumber}`;
metadata.getObjectMD(mpuBucket.getName(), partKey, log, metadata.getObjectMD(mpuBucketName, partKey, log,
(err, result) => { (err, result) => {
// If there is nothing being overwritten just move on // If there is nothing being overwritten just move on
if (err && !err.NoSuchKey) { if (err && !err.NoSuchKey) {
@ -260,12 +320,11 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
oldLocations : [oldLocations]; oldLocations : [oldLocations];
} }
return next(null, locations, totalHash, return next(null, locations, totalHash,
copyObjectSize, mpuBucket, serverSideEncryption, copyObjectSize, serverSideEncryption, oldLocations);
oldLocations);
}); });
}, },
function storeNewPartMetadata(locations, totalHash, function storeNewPartMetadata(locations, totalHash,
copyObjectSize, mpuBucket, serverSideEncryption, copyObjectSize, serverSideEncryption,
oldLocations, next) { oldLocations, next) {
const lastModified = new Date().toJSON(); const lastModified = new Date().toJSON();
const metaStoreParams = { const metaStoreParams = {
@ -276,7 +335,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
splitter: constants.splitter, splitter: constants.splitter,
lastModified, lastModified,
}; };
return services.metadataStorePart(mpuBucket.getName(), return services.metadataStorePart(mpuBucketName,
locations, metaStoreParams, log, err => { locations, metaStoreParams, log, err => {
if (err) { if (err) {
log.debug('error storing new metadata', log.debug('error storing new metadata',

View File

@ -2,6 +2,7 @@ import assert from 'assert';
import async from 'async'; import async from 'async';
import { errors } from 'arsenal'; import { errors } from 'arsenal';
import { BackendInfo } from './apiUtils/object/BackendInfo';
import constants from '../../constants'; import constants from '../../constants';
import data from '../data/wrapper'; import data from '../data/wrapper';
import kms from '../kms/wrapper'; import kms from '../kms/wrapper';
@ -81,33 +82,35 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
const objectKey = request.objectKey; const objectKey = request.objectKey;
return async.waterfall([ return async.waterfall([
// Get the destination bucket. // Get the destination bucket.
next => metadata.getBucket(bucketName, log, (err, bucket) => { next => metadata.getBucket(bucketName, log,
if (err && err.NoSuchBucket) { (err, destinationBucket) => {
return next(errors.NoSuchBucket); if (err && err.NoSuchBucket) {
} return next(errors.NoSuchBucket);
if (err) { }
log.error('error getting the destination bucket', { if (err) {
error: err, log.error('error getting the destination bucket', {
method: 'objectPutPart::metadata.getBucket', error: err,
}); method: 'objectPutPart::metadata.getBucket',
return next(err); });
} return next(err);
return next(null, bucket); }
}), return next(null, destinationBucket);
}),
// Check the bucket authorization. // Check the bucket authorization.
(bucket, next) => { (destinationBucket, next) => {
// For validating the request at the destinationBucket level the // For validating the request at the destinationBucket level the
// `requestType` is the general 'objectPut'. // `requestType` is the general 'objectPut'.
const requestType = 'objectPut'; const requestType = 'objectPut';
if (!isBucketAuthorized(bucket, requestType, canonicalID)) { if (!isBucketAuthorized(destinationBucket, requestType,
canonicalID)) {
log.debug('access denied for user on bucket', { requestType }); log.debug('access denied for user on bucket', { requestType });
return next(errors.AccessDenied); return next(errors.AccessDenied);
} }
return next(null, bucket); return next(null, destinationBucket);
}, },
// Get bucket server-side encryption, if it exists. // Get bucket server-side encryption, if it exists.
(bucket, next) => { (destinationBucket, next) => {
const encryption = bucket.getServerSideEncryption(); const encryption = destinationBucket.getServerSideEncryption();
// If bucket has server-side encryption, pass the `res` value // If bucket has server-side encryption, pass the `res` value
if (encryption) { if (encryption) {
return kms.createCipherBundle(encryption, log, (err, res) => { return kms.createCipherBundle(encryption, log, (err, res) => {
@ -118,14 +121,15 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
}); });
return next(err); return next(err);
} }
return next(null, res); return next(null, destinationBucket, res);
}); });
} }
// The bucket does not have server-side encryption, so pass `null` // The bucket does not have server-side encryption, so pass `null`
return next(null, null); return next(null, destinationBucket, null);
}, },
// Get the MPU shadow bucket. // Get the MPU shadow bucket.
(cipherBundle, next) => metadata.getBucket(mpuBucketName, log, (destinationBucket, cipherBundle, next) =>
metadata.getBucket(mpuBucketName, log,
(err, mpuBucket) => { (err, mpuBucket) => {
if (err && err.NoSuchBucket) { if (err && err.NoSuchBucket) {
return next(errors.NoSuchUpload); return next(errors.NoSuchUpload);
@ -142,10 +146,10 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
if (mpuBucket.getMdBucketModelVersion() < 2) { if (mpuBucket.getMdBucketModelVersion() < 2) {
splitter = constants.oldSplitter; splitter = constants.oldSplitter;
} }
return next(null, cipherBundle, splitter); return next(null, destinationBucket, cipherBundle, splitter);
}), }),
// Check authorization of the MPU shadow bucket. // Check authorization of the MPU shadow bucket.
(cipherBundle, splitter, next) => { (destinationBucket, cipherBundle, splitter, next) => {
const mpuOverviewKey = _getOverviewKey(splitter, objectKey, const mpuOverviewKey = _getOverviewKey(splitter, objectKey,
uploadId); uploadId);
return metadata.getObjectMD(mpuBucketName, mpuOverviewKey, log, return metadata.getObjectMD(mpuBucketName, mpuOverviewKey, log,
@ -163,11 +167,16 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
if (initiatorID !== requesterID) { if (initiatorID !== requesterID) {
return next(errors.AccessDenied); return next(errors.AccessDenied);
} }
return next(null, cipherBundle, splitter); const objectLocationConstraint =
res[constants.objectLocationConstraintHeader];
return next(null, destinationBucket,
objectLocationConstraint,
cipherBundle, splitter);
}); });
}, },
// Get any pre-existing part. // Get any pre-existing part.
(cipherBundle, splitter, next) => { (destinationBucket, objectLocationConstraint, cipherBundle,
splitter, next) => {
const paddedPartNumber = _getPaddedPartNumber(partNumber); const paddedPartNumber = _getPaddedPartNumber(partNumber);
const partKey = _getPartKey(uploadId, splitter, paddedPartNumber); const partKey = _getPartKey(uploadId, splitter, paddedPartNumber);
return metadata.getObjectMD(mpuBucketName, partKey, log, return metadata.getObjectMD(mpuBucketName, partKey, log,
@ -192,19 +201,39 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
oldLocations = Array.isArray(res.partLocations) ? oldLocations = Array.isArray(res.partLocations) ?
res.partLocations : [res.partLocations]; res.partLocations : [res.partLocations];
} }
return next(null, cipherBundle, partKey, prevObjectSize, return next(null, destinationBucket,
oldLocations); objectLocationConstraint, cipherBundle,
partKey, prevObjectSize, oldLocations);
}); });
}, },
// Store in data backend. // Store in data backend.
(cipherBundle, partKey, prevObjectSize, oldLocations, next) => { (destinationBucket, objectLocationConstraint, cipherBundle,
partKey, prevObjectSize, oldLocations, next) => {
const objectKeyContext = { const objectKeyContext = {
bucketName, bucketName,
owner: canonicalID, owner: canonicalID,
namespace: request.namespace, namespace: request.namespace,
objectKey,
partNumber: _getPaddedPartNumber(partNumber),
uploadId,
}; };
const bucketLocationConstraint = destinationBucket
.getLocationConstraint();
const requestEndpoint = request.parsedHost;
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint)) {
return process.nextTick(() => {
next(errors.InvalidArgument
.customizeDescription('Location Constraint Info is ' +
'invalid.'));
});
}
const backendInfo = new BackendInfo(objectLocationConstraint,
bucketLocationConstraint, requestEndpoint);
return dataStore(objectKeyContext, cipherBundle, request, size, return dataStore(objectKeyContext, cipherBundle, request, size,
streamingV4Params, log, (err, dataGetInfo, hexDigest) => { streamingV4Params, backendInfo, log,
(err, dataGetInfo, hexDigest) => {
if (err) { if (err) {
return next(err); return next(err);
} }

View File

@ -65,7 +65,7 @@ export const backend = {
return callback(errors.InternalError); return callback(errors.InternalError);
} }
log.debug('finished writing data', { key }); log.debug('finished writing data', { key });
return callback(null, key); return callback(null, { key, dataStoreName: 'file' });
}); });
return undefined; return undefined;
}); });
@ -73,7 +73,8 @@ export const backend = {
}); });
}, },
get: function getFile(key, range, reqUids, callback) { get: function getFile(objectGetInfo, range, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
const log = createLogger(reqUids); const log = createLogger(reqUids);
const filePath = getFilePath(key); const filePath = getFilePath(key);
log.debug('opening readStream to get data', { filePath }); log.debug('opening readStream to get data', { filePath });
@ -95,7 +96,8 @@ export const backend = {
.on('open', () => { callback(null, rs); }); .on('open', () => { callback(null, rs); });
}, },
delete: function delFile(key, reqUids, callback) { delete: function delFile(objectGetInfo, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
const log = createLogger(reqUids); const log = createLogger(reqUids);
const filePath = getFilePath(key); const filePath = getFilePath(key);
log.debug('deleting file', { filePath }); log.debug('deleting file', { filePath });

View File

@ -43,12 +43,13 @@ export const backend = {
callback(errors.InternalError); callback(errors.InternalError);
} else { } else {
ds[count] = { value, keyContext }; ds[count] = { value, keyContext };
callback(null, count++); callback(null, { key: count++, dataStoreName: 'mem' });
} }
}); });
}, },
get: function getMem(key, range, reqUids, callback) { get: function getMem(objectGetInfo, range, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
process.nextTick(() => { process.nextTick(() => {
if (!ds[key]) { return callback(errors.NoSuchKey); } if (!ds[key]) { return callback(errors.NoSuchKey); }
const storedBuffer = ds[key].value; const storedBuffer = ds[key].value;
@ -80,7 +81,8 @@ export const backend = {
}); });
}, },
delete: function delMem(key, reqUids, callback) { delete: function delMem(objectGetInfo, reqUids, callback) {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
process.nextTick(() => { process.nextTick(() => {
delete ds[key]; delete ds[key];
return callback(null); return callback(null);

View File

@ -0,0 +1,170 @@
import AWS from 'aws-sdk';
import UUID from 'node-uuid';
import Sproxy from 'sproxydclient';
import { errors } from 'arsenal';
import { Logger } from 'werelogs';
import file from './file/backend';
import inMemory from './in_memory/backend';
import config from '../Config';
const logger = new Logger('MultipleBackendGateway', {
logLevel: config.log.logLevel,
dumpLevel: config.log.dumpLevel,
});
function createLogger(reqUids) {
return reqUids ?
logger.newRequestLoggerFromSerializedUids(reqUids) :
logger.newRequestLogger();
}
function _createAwsKey(requestBucketName, requestObjectKey,
partNumber, uploadId) {
const unique = UUID.v4();
// TODO: Discuss how we want to generate keys. Not having unique feature
// is too dangerous since we could have cleanup deletes on a key being
// called after a new object was created
return `${requestBucketName}/uploadId-${uploadId}/` +
`partNumber-${partNumber}/${requestObjectKey}/${unique}`;
}
const clients = {};
Object.keys(config.locationConstraints).forEach(location => {
const locationObj = config.locationConstraints[location];
if (locationObj.type === 'mem') {
clients[location] = inMemory;
}
if (locationObj.type === 'file') {
clients[location] = file;
}
if (locationObj.type === 'scality_s3'
&& locationObj.information.connector === 'sproxyd') {
clients[location] = new Sproxy({
bootstrap: locationObj.information.connector
.sproxyd.bootstrap,
log: config.log,
// Might be undefined which is ok since there is a default
// set in sproxydclient if chordCos is undefined
chordCos: locationObj.information.connector.sproxyd.chordCos,
});
}
if (locationObj.type === 'aws_s3') {
clients[location] = new AWS.S3({
endpoint: `https://${locationObj.information.endpoint}`,
// Non-file stream objects are not supported with SigV4 (node sdk)
// so note that we are using the default of signatureVersion v2
// consider disabling
debug: true,
// perhaps use this rather than setting ourselves. Not implemented yet for streams in node sdk!!!
computeChecksums: true,
credentials: new AWS.SharedIniFileCredentials({ profile:
locationObj.information.credentialsProfile }),
});
clients[location].clientType = 'aws_s3';
clients[location].awsBucketName = locationObj.information.bucketName;
clients[location].dataStoreName = location;
}
if (locationObj.type === 'virtual-user-metadata') {
// TODO
// clients[location] = some sort of bucketclient
}
});
const multipleBackendGateway = {
put: (stream, size, keyContext, backendInfo, reqUids, callback) => {
const controllingLocationConstraint =
backendInfo.getControllingLocationConstraint();
const client = clients[controllingLocationConstraint];
if (!client) {
const log = createLogger(reqUids);
log.error('no data backend matching controlling locationConstraint',
{ controllingLocationConstraint });
return process.nextTick(() => {
callback(errors.InternalError);
});
}
// client is AWS SDK
if (client.clientType === 'aws_s3') {
const partNumber = keyContext.partNumber || '00000';
const uploadId = keyContext.uploadId || '00000';
const awsKey = _createAwsKey(keyContext.bucketName,
keyContext.objectKey, partNumber, uploadId);
return client.putObject({
Bucket: client.awsBucketName,
Key: awsKey,
Body: stream,
ContentLength: size,
//Must fix!!! Use this or see if computeChecksums handles it
//for us
// TODO: This should be in listener to make sure
// we have the completedHash. Also, if we pre-encrypt,
// this will not work. Need to get hash of encrypted version.
// Sending ContentMD5 is needed so that AWS will check to
// make sure it is receiving the correct data.
// ContentMD5: stream.completedHash,
},
(err, data) => {
if (err) {
const log = createLogger(reqUids);
log.error('err from data backend',
{ err, dataStoreName: client.dataStoreName });
// TODO: consider passing through error
// rather than translating though could be confusing
// (e.g., NoSuchBucket error when request was
// actually made to the Scality s3 bucket name)
return callback(errors.InternalError);
}
const dataRetrievalInfo = {
key: awsKey,
dataStoreName: client.dataStoreName,
// because of encryption the ETag here could be
// different from our metadata so let's store it
dataStoreETag: data.ETag,
};
return callback(null, dataRetrievalInfo);
});
}
return client.put(stream, size, keyContext,
reqUids, callback);
},
get: (objectGetInfo, range, reqUids, callback) => {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
const client = clients[objectGetInfo.dataStoreName];
if (client.clientType === 'aws_s3') {
return callback(null, client.getObject({
Bucket: client.awsBucketName,
Key: key,
Range: range,
}).createReadStream());
}
return client.get(objectGetInfo, range, reqUids, callback);
},
delete: (objectGetInfo, reqUids, callback) => {
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
const client = clients[objectGetInfo.dataStoreName];
if (client.clientType === 'aws_s3') {
return client.deleteObject({
Bucket: client.awsBucketName,
Key: key,
});
}
return client.delete(objectGetInfo, reqUids, callback);
},
// checkHealth: to aggregate from multiple?
};
export default multipleBackendGateway;
// DO WRAPPER STUFF BASED ON REQUEST/STORED INFO
//
// For GETS and DELETES use objectGetInfo implName
// For PUTS:
// 1) check x-amz-meta-scal-location-constraint on put/copy
// 2) bucket location constraint
// 3) default for endpoint hit.

View File

@ -1,8 +1,10 @@
import async from 'async'; import async from 'async';
import { errors } from 'arsenal'; import { errors } from 'arsenal';
import Sproxy from 'sproxydclient'; import Sproxy from 'sproxydclient';
import file from './file/backend'; import file from './file/backend';
import inMemory from './in_memory/backend'; import inMemory from './in_memory/backend';
import multipleBackendGateway from './multipleBackendGateway';
import config from '../Config'; import config from '../Config';
import MD5Sum from '../utilities/MD5Sum'; import MD5Sum from '../utilities/MD5Sum';
import assert from 'assert'; import assert from 'assert';
@ -24,11 +26,15 @@ if (config.backends.data === 'mem') {
chordCos: config.sproxyd.chordCos, chordCos: config.sproxyd.chordCos,
}); });
implName = 'sproxyd'; implName = 'sproxyd';
} else if (config.backends.data === 'multiple') {
client = multipleBackendGateway;
implName = 'multipleBackends';
} }
/** /**
* _retryDelete - Attempt to delete key again if it failed previously * _retryDelete - Attempt to delete key again if it failed previously
* @param {string} key - location of the object to delete * @param { string | object } objectGetInfo - either string location of object
* to delete or object containing info of object to delete
* @param {object} log - Werelogs request logger * @param {object} log - Werelogs request logger
* @param {number} count - keeps count of number of times function has been run * @param {number} count - keeps count of number of times function has been run
* @param {function} cb - callback * @param {function} cb - callback
@ -36,20 +42,20 @@ if (config.backends.data === 'mem') {
*/ */
const MAX_RETRY = 2; const MAX_RETRY = 2;
function _retryDelete(key, log, count, cb) { function _retryDelete(objectGetInfo, log, count, cb) {
if (count > MAX_RETRY) { if (count > MAX_RETRY) {
return cb(errors.InternalError); return cb(errors.InternalError);
} }
return client.delete(key, log.getSerializedUids(), err => { return client.delete(objectGetInfo, log.getSerializedUids(), err => {
if (err) { if (err) {
return _retryDelete(key, log, count + 1, cb); return _retryDelete(objectGetInfo, log, count + 1, cb);
} }
return cb(); return cb();
}); });
} }
const data = { const data = {
put: (cipherBundle, value, valueSize, keyContext, log, cb) => { put: (cipherBundle, value, valueSize, keyContext, backendInfo, log, cb) => {
assert.strictEqual(typeof valueSize, 'number'); assert.strictEqual(typeof valueSize, 'number');
log.debug('sending put to datastore', { implName, keyContext, log.debug('sending put to datastore', { implName, keyContext,
method: 'put' }); method: 'put' });
@ -61,20 +67,33 @@ const data = {
writeStream = cipherBundle.cipher; writeStream = cipherBundle.cipher;
hashedStream.pipe(writeStream); hashedStream.pipe(writeStream);
} }
if (implName === 'multipleBackends') {
client.put(writeStream, valueSize, keyContext, log.getSerializedUids(), // Need to send backendInfo to client.put and
(err, key) => { // client.put will provide dataRetrievalInfo so no
if (err) { // need to construct here
log.error('error from datastore', return client.put(writeStream, valueSize, keyContext, backendInfo,
log.getSerializedUids(), (err, dataRetrievalInfo) => {
if (err) {
log.error('error from datastore',
{ error: err, implName });
return cb(errors.InternalError);
}
return cb(null, dataRetrievalInfo, hashedStream);
});
}
return client.put(writeStream, valueSize, keyContext,
log.getSerializedUids(), (err, key) => {
if (err) {
log.error('error from datastore',
{ error: err, implName }); { error: err, implName });
return cb(errors.InternalError); return cb(errors.InternalError);
} }
const dataRetrievalInfo = { const dataRetrievalInfo = {
key, key,
dataStoreName: implName, dataStoreName: implName,
}; };
return cb(null, dataRetrievalInfo, hashedStream); return cb(null, dataRetrievalInfo, hashedStream);
}); });
}, },
get: (objectGetInfo, log, cb) => { get: (objectGetInfo, log, cb) => {
@ -84,34 +103,37 @@ const data = {
const range = objectGetInfo.range; const range = objectGetInfo.range;
log.debug('sending get to datastore', { implName, key, log.debug('sending get to datastore', { implName, key,
range, method: 'get' }); range, method: 'get' });
client.get(key, range, log.getSerializedUids(), (err, stream) => { client.get(objectGetInfo, range, log.getSerializedUids(),
if (err) { (err, stream) => {
log.error('error from sproxyd', { error: err }); if (err) {
return cb(errors.InternalError); log.error('error from datastore', { error: err, implName });
} return cb(errors.InternalError);
if (objectGetInfo.cipheredDataKey) { }
const serverSideEncryption = { if (objectGetInfo.cipheredDataKey) {
cryptoScheme: objectGetInfo.cryptoScheme, const serverSideEncryption = {
masterKeyId: objectGetInfo.masterKeyId, cryptoScheme: objectGetInfo.cryptoScheme,
cipheredDataKey: Buffer.from(objectGetInfo.cipheredDataKey, masterKeyId: objectGetInfo.masterKeyId,
'base64'), cipheredDataKey: Buffer.from(
}; objectGetInfo.cipheredDataKey, 'base64'),
const offset = objectGetInfo.range ? objectGetInfo.range[0] : 0; };
return kms.createDecipherBundle( const offset = objectGetInfo.range ?
serverSideEncryption, offset, log, objectGetInfo.range[0] : 0;
(err, decipherBundle) => { return kms.createDecipherBundle(
if (err) { serverSideEncryption, offset, log,
log.error('cannot get decipher bundle from kms', { (err, decipherBundle) => {
method: 'data.wrapper.data.get', if (err) {
}); log.error('cannot get decipher bundle ' +
return cb(err); 'from kms', {
} method: 'data.wrapper.data.get',
stream.pipe(decipherBundle.decipher); });
return cb(null, decipherBundle.decipher); return cb(err);
}); }
} stream.pipe(decipherBundle.decipher);
return cb(null, stream); return cb(null, decipherBundle.decipher);
}); });
}
return cb(null, stream);
});
}, },
delete: (objectGetInfo, log, cb) => { delete: (objectGetInfo, log, cb) => {
@ -124,7 +146,7 @@ const data = {
key, key,
method: 'delete', method: 'delete',
}); });
_retryDelete(key, log, 0, err => { _retryDelete(objectGetInfo, log, 0, err => {
if (err) { if (err) {
log.error('error deleting object from datastore', log.error('error deleting object from datastore',
{ error: err, key }); { error: err, key });
@ -170,6 +192,7 @@ const data = {
return cb(null, defResp); return cb(null, defResp);
} }
return client.healthcheck(log, (err, result) => { return client.healthcheck(log, (err, result) => {
//NEED TO UPDATE FOR POSSIBILITY OF MULTIPLE HEALTHCHECKS HERE!!
const respBody = {}; const respBody = {};
if (err) { if (err) {
log.error(`error from ${implName}`, { error: err }); log.error(`error from ${implName}`, { error: err });

View File

@ -72,7 +72,11 @@ utils.getAllRegions = function getAllRegions() {
'ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1', 'ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1',
'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2', 'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2',
'us-gov-west-1']; 'us-gov-west-1'];
return Object.keys(config.regions).concat(awsOfficialRegions); if (config.regions !== undefined) {
return Object.keys(config.regions).concat(awsOfficialRegions);
}
return Object.keys(config.restEndpoints).map(e =>
config.restEndpoints[e]).concat(awsOfficialRegions);
}; };
/** /**
@ -81,9 +85,12 @@ utils.getAllRegions = function getAllRegions() {
* @returns {string[]} - list of valid endpoints * @returns {string[]} - list of valid endpoints
*/ */
utils.getAllEndpoints = function getAllEndpoints() { utils.getAllEndpoints = function getAllEndpoints() {
return Object.keys(config.regions) if (config.regions !== undefined) {
.map(r => config.regions[r]) return Object.keys(config.regions)
.reduce((a, b) => a.concat(b)); .map(r => config.regions[r])
.reduce((a, b) => a.concat(b));
}
return Object.keys(config.restEndpoints);
}; };
/** /**

View File

@ -19,6 +19,7 @@
}, },
"homepage": "https://github.com/scality/S3#readme", "homepage": "https://github.com/scality/S3#readme",
"dependencies": { "dependencies": {
"aws-sdk": "^2.2.11",
"arsenal": "scality/Arsenal", "arsenal": "scality/Arsenal",
"async": "~1.4.2", "async": "~1.4.2",
"babel-core": "^6.5.2", "babel-core": "^6.5.2",
@ -33,7 +34,7 @@
"multilevel": "^7.3.0", "multilevel": "^7.3.0",
"node-uuid": "^1.4.3", "node-uuid": "^1.4.3",
"ready-set-stream": "1.0.7", "ready-set-stream": "1.0.7",
"sproxydclient": "scality/sproxydclient", "sproxydclient": "scality/sproxydclient#ft/multipleBackendGet",
"utapi": "scality/utapi", "utapi": "scality/utapi",
"utf8": "~2.1.1", "utf8": "~2.1.1",
"vaultclient": "scality/vaultclient", "vaultclient": "scality/vaultclient",
@ -44,7 +45,6 @@
"ioctl": "2.0.0" "ioctl": "2.0.0"
}, },
"devDependencies": { "devDependencies": {
"aws-sdk": "^2.2.11",
"babel-cli": "^6.2.0", "babel-cli": "^6.2.0",
"babel-eslint": "^6.0.0", "babel-eslint": "^6.0.0",
"bluebird": "^3.3.1", "bluebird": "^3.3.1",

View File

@ -12,7 +12,7 @@ describe('utils.getBucketNameFromHost', () => {
'buck.et', 'bu.ck.et', 'bu.ck-et', 'buck.et', 'bu.ck.et', 'bu.ck-et',
].forEach(bucket => { ].forEach(bucket => {
const headers = { const headers = {
host: `${bucket}.s3.eu-west-1.amazonaws.com`, host: `${bucket}.s3.amazonaws.com`,
}; };
const result = utils.getBucketNameFromHost({ headers }); const result = utils.getBucketNameFromHost({ headers });
assert.strictEqual(result, bucket); assert.strictEqual(result, bucket);
@ -36,12 +36,6 @@ describe('utils.getBucketNameFromHost', () => {
it('should return undefined when non dns-style', () => { it('should return undefined when non dns-style', () => {
[ [
's3.amazonaws.com', 's3.amazonaws.com',
's3.eu.central-1.amazonaws.com',
's3.eu-west-1.amazonaws.com',
's3-external-1.amazonaws.com',
's3.us-east-1.amazonaws.com',
's3-us-gov-west-1.amazonaws.com',
's3-fips-us-gov-west-1.amazonaws.com',
].forEach(host => { ].forEach(host => {
const headers = { host }; const headers = { host };
const result = utils.getBucketNameFromHost({ headers }); const result = utils.getBucketNameFromHost({ headers });
@ -97,7 +91,9 @@ describe('utils.getAllRegions', () => {
it('should return regions from config', () => { it('should return regions from config', () => {
const allRegions = utils.getAllRegions(); const allRegions = utils.getAllRegions();
assert(allRegions.indexOf('localregion') >= 0); assert(allRegions.indexOf('scality-us-east-1') >= 0);
assert(allRegions.indexOf('scality-us-west-1') >= 0);
assert(allRegions.indexOf('aws-us-east-1') >= 0);
}); });
}); });
@ -105,11 +101,10 @@ describe('utils.getAllEndpoints', () => {
it('should return endpoints from config', () => { it('should return endpoints from config', () => {
const allEndpoints = utils.getAllEndpoints(); const allEndpoints = utils.getAllEndpoints();
assert(allEndpoints.indexOf('s3-us-west-2.amazonaws.com') >= 0); assert(allEndpoints.indexOf('127.0.0.1') >= 0);
assert(allEndpoints.indexOf('s3.docker.test') >= 0);
assert(allEndpoints.indexOf('127.0.0.2') >= 0);
assert(allEndpoints.indexOf('s3.amazonaws.com') >= 0); assert(allEndpoints.indexOf('s3.amazonaws.com') >= 0);
assert(allEndpoints.indexOf('s3-external-1.amazonaws.com') >= 0);
assert(allEndpoints.indexOf('s3.us-east-1.amazonaws.com') >= 0);
assert(allEndpoints.indexOf('localhost') >= 0);
}); });
}); });