Compare commits
20 Commits
developmen
...
demo/multB
Author | SHA1 | Date |
---|---|---|
Lauren Spiegel | c3e7ff96a8 | |
Dora Korpar | 5f4a9a27ce | |
Dora Korpar | dc2a5fe132 | |
Dora Korpar | fda82c81c9 | |
Dora Korpar | f80cbbd2e9 | |
Lauren Spiegel | 572a236a5d | |
Lauren Spiegel | a1583a60db | |
Lauren Spiegel | d8fc83a6d9 | |
Lauren Spiegel | 8ccdb3f829 | |
Lauren Spiegel | 6ad9c89cce | |
Lauren Spiegel | 7a0195f9ee | |
Lauren Spiegel | 069326b7dc | |
Lauren Spiegel | fcfcab1bac | |
Lauren Spiegel | dd580f2b16 | |
Lauren Spiegel | c75f17ff04 | |
Lauren Spiegel | b91575f9bd | |
Dora Korpar | 31e0cde919 | |
Dora Korpar | b382e96cc2 | |
Dora Korpar | 74a8e7cd9c | |
Dora Korpar | 94f97a6039 |
82
config.json
82
config.json
|
@ -1,24 +1,67 @@
|
|||
{
|
||||
"port": 8000,
|
||||
"listenOn": [],
|
||||
"regions": {
|
||||
"ap-northeast-1": ["s3.ap-northeast-1.amazonaws.com"],
|
||||
"ap-southeast-1": ["s3.ap-southeast-1.amazonaws.com"],
|
||||
"ap-southeast-2": ["s3.ap-southeast-2.amazonaws.com"],
|
||||
"eu-central-1": ["s3.eu-central-1.amazonaws.com",
|
||||
"s3.eu.central-1.amazonaws.com"],
|
||||
"eu-west-1": ["s3.eu-west-1.amazonaws.com"],
|
||||
"sa-east-1": ["s3.sa-east-1.amazonaws.com"],
|
||||
"us-east-1": ["s3.amazonaws.com",
|
||||
"s3-external-1.amazonaws.com",
|
||||
"s3.us-east-1.amazonaws.com"],
|
||||
"us-west-1": ["s3.us-west-1.amazonaws.com"],
|
||||
"us-west-2": ["s3-us-west-2.amazonaws.com"],
|
||||
"us-gov-west-1": ["s3-us-gov-west-1.amazonaws.com",
|
||||
"s3-fips-us-gov-west-1.amazonaws.com"],
|
||||
"localregion": ["localhost"],
|
||||
"test-region": ["s3.scality.test"],
|
||||
"docker-region": ["s3.docker.test"]
|
||||
"locationConstraints": {
|
||||
"aws-us-east-1": {
|
||||
"type": "aws_s3",
|
||||
"information": {
|
||||
"region": "us-east-1",
|
||||
"bucketName": "premadebucket",
|
||||
"credentialsProfile": "default"
|
||||
}
|
||||
},
|
||||
"aws-us-east-test": {
|
||||
"type": "aws_s3",
|
||||
"information": {
|
||||
"region": "us-east-1",
|
||||
"endpoint": "s3.amazonaws.com",
|
||||
"bucketName": "multitester444",
|
||||
"credentialsProfile": "default"
|
||||
}
|
||||
},
|
||||
"scality-us-east-1": {
|
||||
"type": "scality_s3",
|
||||
"information": {
|
||||
"region": "us-east-1",
|
||||
"connector": {
|
||||
"sproxyd": {
|
||||
"bootstrap": ["localhost:8181"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"scality-us-west-1": {
|
||||
"type": "scality_s3",
|
||||
"information": {
|
||||
"region": "us-west-1",
|
||||
"connector": {
|
||||
"sproxyd": {
|
||||
"bootstrap": ["localhost:8182"]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"virtual-user-metadata": {
|
||||
"type": "scality_user_metadata",
|
||||
"information": {
|
||||
}
|
||||
},
|
||||
"file": {
|
||||
"type": "file",
|
||||
"information": {
|
||||
}
|
||||
},
|
||||
"mem": {
|
||||
"type": "mem",
|
||||
"information": {
|
||||
}
|
||||
}
|
||||
},
|
||||
"restEndpoints": {
|
||||
"127.0.0.1": "scality-us-east-1",
|
||||
"s3.docker.test": "scality-us-west-1",
|
||||
"127.0.0.2": "aws-us-east-1",
|
||||
"s3.amazonaws.com": "aws-us-east-1"
|
||||
},
|
||||
"websiteEndpoints": ["s3-website-us-east-1.amazonaws.com",
|
||||
"s3-website.us-east-2.amazonaws.com",
|
||||
|
@ -34,9 +77,6 @@
|
|||
"s3-website-sa-east-1.amazonaws.com",
|
||||
"s3-website.localhost",
|
||||
"s3-website.scality.test"],
|
||||
"sproxyd": {
|
||||
"bootstrap": ["localhost:8181"]
|
||||
},
|
||||
"bucketd": {
|
||||
"bootstrap": ["localhost"]
|
||||
},
|
||||
|
|
|
@ -66,4 +66,7 @@ export default {
|
|||
// hex digest of sha256 hash of empty string:
|
||||
emptyStringHash: crypto.createHash('sha256')
|
||||
.update('', 'binary').digest('hex'),
|
||||
|
||||
// user metadata header to set object locationConstraint
|
||||
objectLocationConstraintHeader: 'x-amz-meta-scal-location-constraint',
|
||||
};
|
||||
|
|
3
init.js
3
init.js
|
@ -40,7 +40,8 @@ function _setDirSyncFlag(path) {
|
|||
fs.closeSync(pathFD2);
|
||||
}
|
||||
|
||||
if (config.backends.data !== 'file' && config.backends.metadata !== 'file') {
|
||||
if (config.backends.data !== 'file' && config.backends.data !== 'multiple'
|
||||
|| config.backends.metadata === 'scality') {
|
||||
logger.info('No init required. Go forth and store data.');
|
||||
process.exit(0);
|
||||
}
|
||||
|
|
139
lib/Config.js
139
lib/Config.js
|
@ -9,6 +9,28 @@ import authDataChecker from './auth/in_memory/checker';
|
|||
const defaultHealthChecks = { allowFrom: ['127.0.0.1/8', '::1'] };
|
||||
|
||||
const defaultLocalCache = { host: '127.0.0.1', port: 6379 };
|
||||
|
||||
function sproxydAssert(configSproxyd, cb) {
|
||||
if (configSproxyd.bootstrap !== undefined) {
|
||||
assert(Array.isArray(configSproxyd.bootstrap)
|
||||
&& configSproxyd.bootstrap
|
||||
.every(e => typeof e === 'string'),
|
||||
'bad config: sproxyd.bootstrap must be a list of strings');
|
||||
assert(configSproxyd.bootstrap.length > 0,
|
||||
'sproxyd bootstrap list is empty');
|
||||
// thisSproxyd.bootstrap = configSproxyd.bootstrap;
|
||||
cb('bootstrap');
|
||||
}
|
||||
if (configSproxyd.chordCos !== undefined) {
|
||||
assert(typeof configSproxyd.chordCos === 'string',
|
||||
'bad config: sproxyd.chordCos must be a string');
|
||||
assert(configSproxyd.chordCos.match(/^[0-9a-fA-F]{2}$/),
|
||||
'bad config: sproxyd.chordCos must be a 2hex-chars string');
|
||||
// thisSproxyd.chordCos =
|
||||
// Number.parseInt(configSproxyd.chordCos, 16);
|
||||
cb('chordCos');
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Reads from a config file and returns the content as a config object
|
||||
*/
|
||||
|
@ -67,13 +89,75 @@ class Config {
|
|||
});
|
||||
}
|
||||
|
||||
assert(typeof config.regions === 'object',
|
||||
'bad config: the list of regions is mandatory');
|
||||
assert(Object.keys(config.regions).every(
|
||||
r => typeof r === 'string' && config.regions[r] instanceof Array
|
||||
&& config.regions[r].every(e => typeof e === 'string')),
|
||||
'bad config: regions must be a set of {region: [endpoints]}');
|
||||
this.regions = config.regions;
|
||||
// legacy
|
||||
if (config.regions !== undefined) {
|
||||
assert(typeof config.regions === 'object',
|
||||
'bad config: the list of regions is mandatory');
|
||||
assert(Object.keys(config.regions).every(
|
||||
r => typeof r === 'string' && config.regions[r] instanceof Array
|
||||
&& config.regions[r].every(e => typeof e === 'string')),
|
||||
'bad config: regions must be a set of {region: [endpoints]}');
|
||||
this.regions = config.regions;
|
||||
}
|
||||
|
||||
this.locationConstraints = {};
|
||||
if (config.locationConstraints !== undefined) {
|
||||
assert(typeof config.locationConstraints === 'object',
|
||||
'bad config: locationConstraints must be an object');
|
||||
Object.keys(config.locationConstraints).forEach(l => {
|
||||
assert(typeof config.locationConstraints[l].type === 'string' &&
|
||||
typeof config.locationConstraints[l].information
|
||||
=== 'object',
|
||||
'bad config: locationConstraints.type and ' +
|
||||
'locationConstraints.information are mandatory- type ' +
|
||||
'must be a string and information must be an object');
|
||||
this.locationConstraints[l] = config.locationConstraints[l];
|
||||
this.locationConstraints[l].type =
|
||||
config.locationConstraints[l].type;
|
||||
const info = config.locationConstraints[l].information;
|
||||
const stringFields = [
|
||||
'region',
|
||||
'bucketName',
|
||||
'credentialsProfile',
|
||||
];
|
||||
stringFields.forEach(field => {
|
||||
if (info.field !== undefined) {
|
||||
assert(typeof info.field === 'string',
|
||||
`bad config: ${field} must be a string`);
|
||||
this.locationConstraints[l].information.field =
|
||||
info.field;
|
||||
}
|
||||
});
|
||||
if (info.connector !== undefined) {
|
||||
assert(typeof info.connector === 'object',
|
||||
'bad config: connector must be an object');
|
||||
if (info.connector.sproxyd !== undefined) {
|
||||
sproxydAssert(info.connector.sproxyd, field => {
|
||||
if (field === 'chordCos') {
|
||||
this.locationConstraints[l].information
|
||||
.connector.sproxyd[field] =
|
||||
Number.parseInt(info.connector.
|
||||
sproxyd[field], 16);
|
||||
} else {
|
||||
this.locationConstraints[l].information
|
||||
.connector.sproxyd[field] =
|
||||
info.connector.sproxyd[field];
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
this.restEndpoints = {};
|
||||
if (config.restEndpoints !== undefined) {
|
||||
assert(typeof config.restEndpoints === 'object',
|
||||
'bad config: restEndpoints must be an object of endpoints');
|
||||
assert(Object.keys(config.restEndpoints).every(
|
||||
r => typeof config.restEndpoints[r] === 'string'),
|
||||
'bad config: each endpoint must be a string');
|
||||
this.restEndpoints = config.restEndpoints;
|
||||
}
|
||||
|
||||
this.websiteEndpoints = [];
|
||||
if (config.websiteEndpoints !== undefined) {
|
||||
|
@ -95,25 +179,17 @@ class Config {
|
|||
assert(typeof config.usEastBehavior === 'boolean');
|
||||
this.usEastBehavior = config.usEastBehavior;
|
||||
}
|
||||
// legacy
|
||||
this.sproxyd = { bootstrap: [] };
|
||||
if (config.sproxyd !== undefined) {
|
||||
if (config.sproxyd.bootstrap !== undefined) {
|
||||
assert(Array.isArray(config.sproxyd.bootstrap)
|
||||
&& config.sproxyd.bootstrap
|
||||
.every(e => typeof e === 'string'),
|
||||
'bad config: sproxyd.bootstrap must be a list of strings');
|
||||
assert(config.sproxyd.bootstrap.length > 0,
|
||||
'sproxyd bootstrap list is empty');
|
||||
this.sproxyd.bootstrap = config.sproxyd.bootstrap;
|
||||
}
|
||||
if (config.sproxyd.chordCos !== undefined) {
|
||||
assert(typeof config.sproxyd.chordCos === 'string',
|
||||
'bad config: sproxyd.chordCos must be a string');
|
||||
assert(config.sproxyd.chordCos.match(/^[0-9a-fA-F]{2}$/),
|
||||
'bad config: sproxyd.chordCos must be a 2hex-chars string');
|
||||
this.sproxyd.chordCos =
|
||||
Number.parseInt(config.sproxyd.chordCos, 16);
|
||||
}
|
||||
sproxydAssert(config.sproxyd, field => {
|
||||
if (field === 'chordCos') {
|
||||
this.sproxyd[field] =
|
||||
Number.parseInt(config.sproxyd[field], 16);
|
||||
} else {
|
||||
this.sproxyd[field] = config.sproxyd[field];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
this.bucketd = { bootstrap: [] };
|
||||
|
@ -329,8 +405,19 @@ class Config {
|
|||
}
|
||||
this.authData = authData;
|
||||
}
|
||||
if (process.env.S3SPROXYD) {
|
||||
data = process.env.S3SPROXYD;
|
||||
if (process.env.S3DATA) {
|
||||
const validData = ['mem', 'file', 'scality', 'multiple'];
|
||||
assert(validData.indexOf(process.env.S3DATA) > -1,
|
||||
'bad environment variable: S3DATA environment variable ' +
|
||||
'should be one of mem/file/scality/multiple'
|
||||
);
|
||||
if (process.env.S3DATA === 'multiple') {
|
||||
assert(config.locationConstraints !== undefined,
|
||||
'for multiple data backends, locationConstraints ' +
|
||||
'must be set'
|
||||
);
|
||||
}
|
||||
data = process.env.S3DATA;
|
||||
}
|
||||
if (process.env.S3METADATA) {
|
||||
metadata = process.env.S3METADATA;
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
import config from '../../../Config';
|
||||
|
||||
export class BackendInfo {
|
||||
/**
|
||||
* Represents the info necessary to evaluate which data backend to use
|
||||
* on a data put call.
|
||||
* @constructor
|
||||
* @param {string | undefined} objectLocationConstraint - location constraint
|
||||
* for object based on user meta header
|
||||
* @param {string | undefined } bucketLocationConstraint - location
|
||||
* constraint for bucket based on bucket metadata
|
||||
* @param {string} requestEndpoint - endpoint to which request was made
|
||||
*/
|
||||
constructor(objectLocationConstraint, bucketLocationConstraint,
|
||||
requestEndpoint) {
|
||||
this._objectLocationConstraint = objectLocationConstraint;
|
||||
this._bucketLocationConstraint = bucketLocationConstraint;
|
||||
this._requestEndpoint = requestEndpoint;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* validate proposed location constraint against config
|
||||
* @param {string | undefined} locationConstraint - value of user
|
||||
* metadata location constraint header or bucket location constraint
|
||||
* @param {object} log - werelogs logger
|
||||
* @return {boolean} - true if valid, false if not
|
||||
*/
|
||||
static isValidLocationConstraint(locationConstraint, log) {
|
||||
if (Object.keys(config.locationConstraints)
|
||||
.indexOf(locationConstraint) < 0) {
|
||||
log.trace('proposed locationConstraint is invalid',
|
||||
{ locationConstraint });
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* validate requestEndpoint against config
|
||||
* if there is a mismatch between the request endpoint and what is in
|
||||
* the config, this could cause a problem for setting the backend location
|
||||
* for storing data
|
||||
* @param {string} requestEndpoint - request endpoint
|
||||
* @param {object} log - werelogs logger
|
||||
* @return {boolean} - true if valid, false if not
|
||||
*/
|
||||
static isValidRequestEndpoint(requestEndpoint, log) {
|
||||
if (Object.keys(config.restEndpoints)
|
||||
.indexOf(requestEndpoint) < 0) {
|
||||
log.trace('requestEndpoint does not match config restEndpoints',
|
||||
{ requestEndpoint });
|
||||
return false;
|
||||
}
|
||||
if (Object.keys(config.locationConstraints).indexOf(config
|
||||
.restEndpoints[requestEndpoint]) < 0) {
|
||||
log.trace('the default locationConstraint for requestEndpoint ' +
|
||||
'does not match any config locationConstraint',
|
||||
{ requestEndpoint });
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* validate proposed BackendInfo parameters
|
||||
* @param {string | undefined} objectLocationConstraint - value of user
|
||||
* metadata location constraint header
|
||||
* @param {string | null} bucketLocationConstraint - location
|
||||
* constraint from bucket metadata
|
||||
* @param {string} requestEndpoint - endpoint of request
|
||||
* @param {object} log - werelogs logger
|
||||
* @return {boolean} - true if valid, false if not
|
||||
*/
|
||||
static areValidBackendParameters(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint, log) {
|
||||
if (objectLocationConstraint &&
|
||||
!BackendInfo.isValidLocationConstraint(objectLocationConstraint,
|
||||
log)) {
|
||||
log.trace('objectLocationConstraint is invalid');
|
||||
return false;
|
||||
}
|
||||
if (bucketLocationConstraint &&
|
||||
!BackendInfo.isValidLocationConstraint(bucketLocationConstraint,
|
||||
log)) {
|
||||
log.trace('bucketLocationConstraint is invalid');
|
||||
return false;
|
||||
}
|
||||
if (!BackendInfo.isValidRequestEndpoint(requestEndpoint, log)) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return objectLocationConstraint
|
||||
* @return {string | undefined} objectLocationConstraint;
|
||||
*/
|
||||
getObjectLocationConstraint() {
|
||||
return this._objectLocationConstraint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return bucketLocationConstraint
|
||||
* @return {string | undefined} bucketLocationConstraint;
|
||||
*/
|
||||
getBucketLocationConstraint() {
|
||||
return this._bucketLocationConstraint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return requestEndpoint
|
||||
* @return {string} requestEndpoint;
|
||||
*/
|
||||
getRequestEndpoint() {
|
||||
return this._requestEndpoint;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return locationConstraint that should be used with put request
|
||||
* Order of priority is:
|
||||
* (1) objectLocationConstraint,
|
||||
* (2) bucketLocationConstraint,
|
||||
* (3) default locationConstraint for requestEndpoint
|
||||
* @return {string} locationConstraint;
|
||||
*/
|
||||
getControllingLocationConstraint() {
|
||||
if (this._objectLocationConstraint) {
|
||||
return this.getObjectLocationConstraint();
|
||||
}
|
||||
if (this._bucketLocationConstraint) {
|
||||
return this.getBucketLocationConstraint();
|
||||
}
|
||||
return config.restEndpoints[this.getRequestEndpoint];
|
||||
}
|
||||
}
|
|
@ -55,7 +55,7 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Stores object and responds back with location and storage type
|
||||
* Stores object and responds back with key and storage type
|
||||
* @param {object} objectContext - object's keyContext for sproxyd Key
|
||||
* computation (put API)
|
||||
* @param {object} cipherBundle - cipher bundle that encrypt the data
|
||||
|
@ -64,14 +64,16 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
|||
* @param {object | null } streamingV4Params - if v4 auth, object containing
|
||||
* accessKey, signatureFromRequest, region, scopeDate, timestamp, and
|
||||
* credentialScope (to be used for streaming v4 auth if applicable)
|
||||
* @param {BackendInfo} backendInfo - info to determine which data
|
||||
* backend to use
|
||||
* @param {RequestLogger} log - the current stream logger
|
||||
* @param {function} cb - callback containing result for the next task
|
||||
* @return {undefined}
|
||||
*/
|
||||
export function dataStore(objectContext, cipherBundle, stream, size,
|
||||
streamingV4Params, log, cb) {
|
||||
streamingV4Params, backendInfo, log, cb) {
|
||||
const dataStream = prepareStream(stream, streamingV4Params, log, cb);
|
||||
data.put(cipherBundle, dataStream, size, objectContext, log,
|
||||
data.put(cipherBundle, dataStream, size, objectContext, backendInfo, log,
|
||||
(err, dataRetrievalInfo, hashedStream) => {
|
||||
if (err) {
|
||||
log.error('error in datastore', {
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
import async from 'async';
|
||||
import { errors } from 'arsenal';
|
||||
|
||||
import { BackendInfo } from './apiUtils/object/BackendInfo';
|
||||
import constants from '../../constants';
|
||||
import data from '../data/wrapper';
|
||||
import kms from '../kms/wrapper';
|
||||
import { logger } from '../utilities/logger';
|
||||
|
@ -128,6 +130,7 @@ function objectCopy(authInfo, request, sourceBucket,
|
|||
bucketName: destBucketName,
|
||||
owner: authInfo.getCanonicalID(),
|
||||
namespace: request.namespace,
|
||||
objectKey: destObjectKey,
|
||||
};
|
||||
const websiteRedirectHeader =
|
||||
request.headers['x-amz-website-redirect-location'];
|
||||
|
@ -216,6 +219,22 @@ function objectCopy(authInfo, request, sourceBucket,
|
|||
return next(null, storeMetadataParams, dataLocator, destObjMD,
|
||||
serverSideEncryption);
|
||||
}
|
||||
const objectLocationConstraint = storeMetadataParams
|
||||
.metaHeaders[constants.objectLocationConstraintHeader];
|
||||
const bucketLocationConstraint = destBucketMD
|
||||
.getLocationConstraint();
|
||||
const requestEndpoint = request.parsedHost;
|
||||
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint)) {
|
||||
return process.nextTick(() => {
|
||||
next(errors.InvalidArgument
|
||||
.customizeDescription('Location Constraint Info is ' +
|
||||
'invalid.'));
|
||||
});
|
||||
}
|
||||
const backendInfo = new BackendInfo(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint);
|
||||
|
||||
// dataLocator is an array. need to get and put all parts
|
||||
// For now, copy 1 part at a time. Could increase the second
|
||||
// argument here to increase the number of parts
|
||||
|
@ -236,7 +255,8 @@ function objectCopy(authInfo, request, sourceBucket,
|
|||
return cb(errors.InternalError);
|
||||
}
|
||||
return data.put(cipherBundle, stream,
|
||||
part.size, dataStoreContext, log,
|
||||
part.size, dataStoreContext,
|
||||
backendInfo, log,
|
||||
(error, partRetrievalInfo) => {
|
||||
if (error) {
|
||||
return cb(error);
|
||||
|
@ -258,8 +278,10 @@ function objectCopy(authInfo, request, sourceBucket,
|
|||
}
|
||||
// Copied object is not encrypted so just put it
|
||||
// without a cipherBundle
|
||||
|
||||
return data.put(null, stream, part.size,
|
||||
dataStoreContext, log, (error, partRetrievalInfo) => {
|
||||
dataStoreContext, backendInfo,
|
||||
log, (error, partRetrievalInfo) => {
|
||||
if (error) {
|
||||
return cb(error);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import { errors } from 'arsenal';
|
||||
|
||||
import { BackendInfo } from './apiUtils/object/BackendInfo';
|
||||
import constants from '../../constants';
|
||||
import services from '../services';
|
||||
import validateHeaders from '../utilities/validateHeaders';
|
||||
import { pushMetric } from '../utapi/utilities';
|
||||
|
@ -51,8 +53,21 @@ export default function objectDelete(authInfo, request, log, cb) {
|
|||
contentLength: objMD['content-length'],
|
||||
});
|
||||
}
|
||||
return services.deleteObject(bucketName, objMD, objectKey, log,
|
||||
err => {
|
||||
const objectLocationConstraint = request
|
||||
.headers[constants.objectLocationConstraintHeader];
|
||||
const bucketLocationConstraint = bucket.getLocationConstraint();
|
||||
const requestEndpoint = request.parsedHost;
|
||||
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint)) {
|
||||
return process.nextTick(() => {
|
||||
cb(errors.InvalidArgument
|
||||
.customizeDescription('Location Constraint Info is ' +
|
||||
'invalid.'));
|
||||
});
|
||||
}
|
||||
|
||||
return services.deleteObject(bucketName, objMD, objectKey,
|
||||
log, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import { errors } from 'arsenal';
|
||||
|
||||
import { BackendInfo } from './apiUtils/object/BackendInfo';
|
||||
import data from '../data/wrapper';
|
||||
import services from '../services';
|
||||
import aclUtils from '../utilities/aclUtils';
|
||||
|
@ -26,7 +27,7 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
|||
});
|
||||
}
|
||||
|
||||
function _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID,
|
||||
function _storeIt(bucket, objectKey, objMD, authInfo, canonicalID,
|
||||
cipherBundle, request, streamingV4Params, log, callback) {
|
||||
const size = request.parsedContentLength;
|
||||
|
||||
|
@ -41,10 +42,12 @@ function _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID,
|
|||
|
||||
const metaHeaders = utils.getMetaHeaders(request.headers);
|
||||
log.trace('meta headers', { metaHeaders, method: 'objectPut' });
|
||||
const bucketName = bucket.getName();
|
||||
const objectKeyContext = {
|
||||
bucketName,
|
||||
owner: canonicalID,
|
||||
namespace: request.namespace,
|
||||
objectKey,
|
||||
};
|
||||
// If the request was made with a pre-signed url, the x-amz-acl 'header'
|
||||
// might be in the query string rather than the actual headers so include
|
||||
|
@ -81,8 +84,23 @@ function _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID,
|
|||
log.trace('storing object in data', {
|
||||
method: 'services.metadataValidateAuthorization',
|
||||
});
|
||||
const objectLocationConstraint = request
|
||||
.headers[constants.objectLocationConstraintHeader];
|
||||
const bucketLocationConstraint = bucket.getLocationConstraint();
|
||||
const requestEndpoint = request.parsedHost;
|
||||
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint)) {
|
||||
return process.nextTick(() => {
|
||||
callback(errors.InvalidArgument
|
||||
.customizeDescription('Location Constraint Info is ' +
|
||||
'invalid.'));
|
||||
});
|
||||
}
|
||||
const backendInfo = new BackendInfo(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint);
|
||||
return dataStore(objectKeyContext, cipherBundle, request, size,
|
||||
streamingV4Params, log, (err, dataGetInfo, calculatedHash) => {
|
||||
streamingV4Params, backendInfo, log,
|
||||
(err, dataGetInfo, calculatedHash) => {
|
||||
if (err) {
|
||||
log.trace('error from data', {
|
||||
error: err,
|
||||
|
@ -225,13 +243,13 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
|||
if (err) {
|
||||
return callback(errors.InternalError);
|
||||
}
|
||||
return _storeIt(bucketName, objectKey,
|
||||
return _storeIt(bucket, objectKey,
|
||||
objMD, authInfo, canonicalID,
|
||||
cipherBundle, request,
|
||||
streamingV4Params, log, callback);
|
||||
});
|
||||
}
|
||||
return _storeIt(bucketName, objectKey, objMD,
|
||||
return _storeIt(bucket, objectKey, objMD,
|
||||
authInfo, canonicalID, null, request,
|
||||
streamingV4Params, log, callback);
|
||||
});
|
||||
|
@ -243,12 +261,12 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
|||
if (err) {
|
||||
return callback(errors.InternalError);
|
||||
}
|
||||
return _storeIt(bucketName, objectKey, objMD,
|
||||
return _storeIt(bucket, objectKey, objMD,
|
||||
authInfo, canonicalID, cipherBundle,
|
||||
request, streamingV4Params, log, callback);
|
||||
});
|
||||
}
|
||||
return _storeIt(bucketName, objectKey, objMD, authInfo, canonicalID,
|
||||
return _storeIt(bucket, objectKey, objMD, authInfo, canonicalID,
|
||||
null, request, streamingV4Params, log, callback);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import async from 'async';
|
||||
import { errors } from 'arsenal';
|
||||
|
||||
import { BackendInfo } from './apiUtils/object/BackendInfo';
|
||||
import constants from '../../constants';
|
||||
import data from '../data/wrapper';
|
||||
import kms from '../kms/wrapper';
|
||||
|
@ -30,6 +31,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
log.debug('processing request', { method: 'objectPutCopyPart' });
|
||||
const destBucketName = request.bucketName;
|
||||
const destObjectKey = request.objectKey;
|
||||
const mpuBucketName = `${constants.mpuBucketPrefix}${destBucketName}`;
|
||||
const valGetParams = {
|
||||
authInfo,
|
||||
bucketName: sourceBucket,
|
||||
|
@ -74,6 +76,9 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
bucketName: destBucketName,
|
||||
owner: authInfo.getCanonicalID(),
|
||||
namespace: request.namespace,
|
||||
objectKey: destObjectKey,
|
||||
partNumber: paddedPartNumber,
|
||||
uploadId,
|
||||
};
|
||||
|
||||
return async.waterfall([
|
||||
|
@ -123,29 +128,83 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
copyObjectSize);
|
||||
});
|
||||
},
|
||||
function checkMPUBucketAuth(dataLocator, destBucketMD,
|
||||
// get MPU shadow bucket to get splitter based on MD version
|
||||
function getMpuShadowBucket(dataLocator, destBucketMD,
|
||||
copyObjectSize, next) {
|
||||
return services.metadataValidateMultipart(valMPUParams,
|
||||
return metadata.getBucket(mpuBucketName, log,
|
||||
(err, mpuBucket) => {
|
||||
if (err && err.NoSuchBucket) {
|
||||
return next(errors.NoSuchUpload);
|
||||
}
|
||||
if (err) {
|
||||
log.trace('error authorizing based on mpu bucket',
|
||||
{ error: err });
|
||||
log.error('error getting the shadow mpu bucket', {
|
||||
error: err,
|
||||
method: 'objectPutCopyPart::metadata.getBucket',
|
||||
});
|
||||
return next(err);
|
||||
}
|
||||
return next(null, dataLocator,
|
||||
destBucketMD, mpuBucket, copyObjectSize);
|
||||
let splitter = constants.splitter;
|
||||
if (mpuBucket.getMdBucketModelVersion() < 2) {
|
||||
splitter = constants.oldSplitter;
|
||||
}
|
||||
return next(null, dataLocator, destBucketMD,
|
||||
copyObjectSize, splitter);
|
||||
});
|
||||
},
|
||||
// Get MPU overview object to check authorization to put a part
|
||||
// and to get any object location constraint info
|
||||
function getMpuOverviewObject(dataLocator, destBucketMD,
|
||||
copyObjectSize, splitter, next) {
|
||||
const mpuOverviewKey =
|
||||
`overview${splitter}${destObjectKey}${splitter}${uploadId}`;
|
||||
return metadata.getObjectMD(mpuBucketName, mpuOverviewKey, log,
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
log.error('error getting overview object from ' +
|
||||
'mpu bucket', {
|
||||
error: err,
|
||||
method: 'objectPutCopyPart::' +
|
||||
'metadata.getObjectMD',
|
||||
});
|
||||
return next(err);
|
||||
}
|
||||
const initiatorID = res.initiator.ID;
|
||||
const requesterID = authInfo.isRequesterAnIAMUser() ?
|
||||
authInfo.getArn() : authInfo.getCanonicalID();
|
||||
if (initiatorID !== requesterID) {
|
||||
return next(errors.AccessDenied);
|
||||
}
|
||||
const objectLocationConstraint =
|
||||
res[constants.objectLocationConstraintHeader];
|
||||
return next(null, dataLocator, destBucketMD,
|
||||
objectLocationConstraint, copyObjectSize, next);
|
||||
});
|
||||
},
|
||||
function goGetData(dataLocator, destBucketMD,
|
||||
mpuBucket, copyObjectSize, next) {
|
||||
objectLocationConstraint, copyObjectSize, next) {
|
||||
const serverSideEncryption = destBucketMD.getServerSideEncryption();
|
||||
|
||||
// skip if 0 byte object
|
||||
if (dataLocator.length === 0) {
|
||||
return next(null, [], constants.emptyFileMd5,
|
||||
copyObjectSize, mpuBucket,
|
||||
serverSideEncryption);
|
||||
return process.nextTick(() => {
|
||||
next(null, [], constants.emptyFileMd5,
|
||||
copyObjectSize, serverSideEncryption);
|
||||
});
|
||||
}
|
||||
const bucketLocationConstraint = destBucketMD
|
||||
.getLocationConstraint();
|
||||
const requestEndpoint = request.parsedHost;
|
||||
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint)) {
|
||||
return process.nextTick(() => {
|
||||
next(errors.InvalidArgument
|
||||
.customizeDescription('Location Constraint Info is ' +
|
||||
'invalid.'));
|
||||
});
|
||||
}
|
||||
const backendInfo = new BackendInfo(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint);
|
||||
|
||||
// totalHash will be sent through the RelayMD5Sum transform streams
|
||||
// to collect the md5 from multiple streams
|
||||
let totalHash;
|
||||
|
@ -178,7 +237,8 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
return cb(errors.InternalError);
|
||||
}
|
||||
return data.put(cipherBundle, hashedStream,
|
||||
numberPartSize, dataStoreContext, log,
|
||||
numberPartSize, dataStoreContext,
|
||||
backendInfo, log,
|
||||
(error, partRetrievalInfo) => {
|
||||
if (error) {
|
||||
log.debug('error putting ' +
|
||||
|
@ -210,7 +270,8 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
// Copied object is not encrypted so just put it
|
||||
// without a cipherBundle
|
||||
return data.put(null, hashedStream, numberPartSize,
|
||||
dataStoreContext, log, (error, partRetrievalInfo) => {
|
||||
dataStoreContext, backendInfo,
|
||||
log, (error, partRetrievalInfo) => {
|
||||
if (error) {
|
||||
log.debug('error putting object part',
|
||||
{ error });
|
||||
|
@ -234,15 +295,14 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
// Digest the final combination of all of the part streams
|
||||
totalHash = totalHash.digest('hex');
|
||||
return next(null, locations, totalHash,
|
||||
copyObjectSize, mpuBucket,
|
||||
serverSideEncryption);
|
||||
copyObjectSize, serverSideEncryption);
|
||||
});
|
||||
},
|
||||
function getExistingPartInfo(locations, totalHash,
|
||||
copyObjectSize, mpuBucket, serverSideEncryption, next) {
|
||||
copyObjectSize, serverSideEncryption, next) {
|
||||
const partKey =
|
||||
`${uploadId}${constants.splitter}${paddedPartNumber}`;
|
||||
metadata.getObjectMD(mpuBucket.getName(), partKey, log,
|
||||
metadata.getObjectMD(mpuBucketName, partKey, log,
|
||||
(err, result) => {
|
||||
// If there is nothing being overwritten just move on
|
||||
if (err && !err.NoSuchKey) {
|
||||
|
@ -260,12 +320,11 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
oldLocations : [oldLocations];
|
||||
}
|
||||
return next(null, locations, totalHash,
|
||||
copyObjectSize, mpuBucket, serverSideEncryption,
|
||||
oldLocations);
|
||||
copyObjectSize, serverSideEncryption, oldLocations);
|
||||
});
|
||||
},
|
||||
function storeNewPartMetadata(locations, totalHash,
|
||||
copyObjectSize, mpuBucket, serverSideEncryption,
|
||||
copyObjectSize, serverSideEncryption,
|
||||
oldLocations, next) {
|
||||
const lastModified = new Date().toJSON();
|
||||
const metaStoreParams = {
|
||||
|
@ -276,7 +335,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
|||
splitter: constants.splitter,
|
||||
lastModified,
|
||||
};
|
||||
return services.metadataStorePart(mpuBucket.getName(),
|
||||
return services.metadataStorePart(mpuBucketName,
|
||||
locations, metaStoreParams, log, err => {
|
||||
if (err) {
|
||||
log.debug('error storing new metadata',
|
||||
|
|
|
@ -2,6 +2,7 @@ import assert from 'assert';
|
|||
import async from 'async';
|
||||
import { errors } from 'arsenal';
|
||||
|
||||
import { BackendInfo } from './apiUtils/object/BackendInfo';
|
||||
import constants from '../../constants';
|
||||
import data from '../data/wrapper';
|
||||
import kms from '../kms/wrapper';
|
||||
|
@ -81,33 +82,35 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
const objectKey = request.objectKey;
|
||||
return async.waterfall([
|
||||
// Get the destination bucket.
|
||||
next => metadata.getBucket(bucketName, log, (err, bucket) => {
|
||||
if (err && err.NoSuchBucket) {
|
||||
return next(errors.NoSuchBucket);
|
||||
}
|
||||
if (err) {
|
||||
log.error('error getting the destination bucket', {
|
||||
error: err,
|
||||
method: 'objectPutPart::metadata.getBucket',
|
||||
});
|
||||
return next(err);
|
||||
}
|
||||
return next(null, bucket);
|
||||
}),
|
||||
next => metadata.getBucket(bucketName, log,
|
||||
(err, destinationBucket) => {
|
||||
if (err && err.NoSuchBucket) {
|
||||
return next(errors.NoSuchBucket);
|
||||
}
|
||||
if (err) {
|
||||
log.error('error getting the destination bucket', {
|
||||
error: err,
|
||||
method: 'objectPutPart::metadata.getBucket',
|
||||
});
|
||||
return next(err);
|
||||
}
|
||||
return next(null, destinationBucket);
|
||||
}),
|
||||
// Check the bucket authorization.
|
||||
(bucket, next) => {
|
||||
(destinationBucket, next) => {
|
||||
// For validating the request at the destinationBucket level the
|
||||
// `requestType` is the general 'objectPut'.
|
||||
const requestType = 'objectPut';
|
||||
if (!isBucketAuthorized(bucket, requestType, canonicalID)) {
|
||||
if (!isBucketAuthorized(destinationBucket, requestType,
|
||||
canonicalID)) {
|
||||
log.debug('access denied for user on bucket', { requestType });
|
||||
return next(errors.AccessDenied);
|
||||
}
|
||||
return next(null, bucket);
|
||||
return next(null, destinationBucket);
|
||||
},
|
||||
// Get bucket server-side encryption, if it exists.
|
||||
(bucket, next) => {
|
||||
const encryption = bucket.getServerSideEncryption();
|
||||
(destinationBucket, next) => {
|
||||
const encryption = destinationBucket.getServerSideEncryption();
|
||||
// If bucket has server-side encryption, pass the `res` value
|
||||
if (encryption) {
|
||||
return kms.createCipherBundle(encryption, log, (err, res) => {
|
||||
|
@ -118,14 +121,15 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
});
|
||||
return next(err);
|
||||
}
|
||||
return next(null, res);
|
||||
return next(null, destinationBucket, res);
|
||||
});
|
||||
}
|
||||
// The bucket does not have server-side encryption, so pass `null`
|
||||
return next(null, null);
|
||||
return next(null, destinationBucket, null);
|
||||
},
|
||||
// Get the MPU shadow bucket.
|
||||
(cipherBundle, next) => metadata.getBucket(mpuBucketName, log,
|
||||
(destinationBucket, cipherBundle, next) =>
|
||||
metadata.getBucket(mpuBucketName, log,
|
||||
(err, mpuBucket) => {
|
||||
if (err && err.NoSuchBucket) {
|
||||
return next(errors.NoSuchUpload);
|
||||
|
@ -142,10 +146,10 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
if (mpuBucket.getMdBucketModelVersion() < 2) {
|
||||
splitter = constants.oldSplitter;
|
||||
}
|
||||
return next(null, cipherBundle, splitter);
|
||||
return next(null, destinationBucket, cipherBundle, splitter);
|
||||
}),
|
||||
// Check authorization of the MPU shadow bucket.
|
||||
(cipherBundle, splitter, next) => {
|
||||
(destinationBucket, cipherBundle, splitter, next) => {
|
||||
const mpuOverviewKey = _getOverviewKey(splitter, objectKey,
|
||||
uploadId);
|
||||
return metadata.getObjectMD(mpuBucketName, mpuOverviewKey, log,
|
||||
|
@ -163,11 +167,16 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
if (initiatorID !== requesterID) {
|
||||
return next(errors.AccessDenied);
|
||||
}
|
||||
return next(null, cipherBundle, splitter);
|
||||
const objectLocationConstraint =
|
||||
res[constants.objectLocationConstraintHeader];
|
||||
return next(null, destinationBucket,
|
||||
objectLocationConstraint,
|
||||
cipherBundle, splitter);
|
||||
});
|
||||
},
|
||||
// Get any pre-existing part.
|
||||
(cipherBundle, splitter, next) => {
|
||||
(destinationBucket, objectLocationConstraint, cipherBundle,
|
||||
splitter, next) => {
|
||||
const paddedPartNumber = _getPaddedPartNumber(partNumber);
|
||||
const partKey = _getPartKey(uploadId, splitter, paddedPartNumber);
|
||||
return metadata.getObjectMD(mpuBucketName, partKey, log,
|
||||
|
@ -192,19 +201,39 @@ export default function objectPutPart(authInfo, request, streamingV4Params, log,
|
|||
oldLocations = Array.isArray(res.partLocations) ?
|
||||
res.partLocations : [res.partLocations];
|
||||
}
|
||||
return next(null, cipherBundle, partKey, prevObjectSize,
|
||||
oldLocations);
|
||||
return next(null, destinationBucket,
|
||||
objectLocationConstraint, cipherBundle,
|
||||
partKey, prevObjectSize, oldLocations);
|
||||
});
|
||||
},
|
||||
// Store in data backend.
|
||||
(cipherBundle, partKey, prevObjectSize, oldLocations, next) => {
|
||||
(destinationBucket, objectLocationConstraint, cipherBundle,
|
||||
partKey, prevObjectSize, oldLocations, next) => {
|
||||
const objectKeyContext = {
|
||||
bucketName,
|
||||
owner: canonicalID,
|
||||
namespace: request.namespace,
|
||||
objectKey,
|
||||
partNumber: _getPaddedPartNumber(partNumber),
|
||||
uploadId,
|
||||
};
|
||||
const bucketLocationConstraint = destinationBucket
|
||||
.getLocationConstraint();
|
||||
const requestEndpoint = request.parsedHost;
|
||||
if (!BackendInfo.areValidBackendParameters(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint)) {
|
||||
return process.nextTick(() => {
|
||||
next(errors.InvalidArgument
|
||||
.customizeDescription('Location Constraint Info is ' +
|
||||
'invalid.'));
|
||||
});
|
||||
}
|
||||
const backendInfo = new BackendInfo(objectLocationConstraint,
|
||||
bucketLocationConstraint, requestEndpoint);
|
||||
|
||||
return dataStore(objectKeyContext, cipherBundle, request, size,
|
||||
streamingV4Params, log, (err, dataGetInfo, hexDigest) => {
|
||||
streamingV4Params, backendInfo, log,
|
||||
(err, dataGetInfo, hexDigest) => {
|
||||
if (err) {
|
||||
return next(err);
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ export const backend = {
|
|||
return callback(errors.InternalError);
|
||||
}
|
||||
log.debug('finished writing data', { key });
|
||||
return callback(null, key);
|
||||
return callback(null, { key, dataStoreName: 'file' });
|
||||
});
|
||||
return undefined;
|
||||
});
|
||||
|
@ -73,7 +73,8 @@ export const backend = {
|
|||
});
|
||||
},
|
||||
|
||||
get: function getFile(key, range, reqUids, callback) {
|
||||
get: function getFile(objectGetInfo, range, reqUids, callback) {
|
||||
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
|
||||
const log = createLogger(reqUids);
|
||||
const filePath = getFilePath(key);
|
||||
log.debug('opening readStream to get data', { filePath });
|
||||
|
@ -95,7 +96,8 @@ export const backend = {
|
|||
.on('open', () => { callback(null, rs); });
|
||||
},
|
||||
|
||||
delete: function delFile(key, reqUids, callback) {
|
||||
delete: function delFile(objectGetInfo, reqUids, callback) {
|
||||
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
|
||||
const log = createLogger(reqUids);
|
||||
const filePath = getFilePath(key);
|
||||
log.debug('deleting file', { filePath });
|
||||
|
|
|
@ -43,12 +43,13 @@ export const backend = {
|
|||
callback(errors.InternalError);
|
||||
} else {
|
||||
ds[count] = { value, keyContext };
|
||||
callback(null, count++);
|
||||
callback(null, { key: count++, dataStoreName: 'mem' });
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
get: function getMem(key, range, reqUids, callback) {
|
||||
get: function getMem(objectGetInfo, range, reqUids, callback) {
|
||||
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
|
||||
process.nextTick(() => {
|
||||
if (!ds[key]) { return callback(errors.NoSuchKey); }
|
||||
const storedBuffer = ds[key].value;
|
||||
|
@ -80,7 +81,8 @@ export const backend = {
|
|||
});
|
||||
},
|
||||
|
||||
delete: function delMem(key, reqUids, callback) {
|
||||
delete: function delMem(objectGetInfo, reqUids, callback) {
|
||||
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
|
||||
process.nextTick(() => {
|
||||
delete ds[key];
|
||||
return callback(null);
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
import AWS from 'aws-sdk';
|
||||
import UUID from 'node-uuid';
|
||||
import Sproxy from 'sproxydclient';
|
||||
import { errors } from 'arsenal';
|
||||
import { Logger } from 'werelogs';
|
||||
|
||||
import file from './file/backend';
|
||||
import inMemory from './in_memory/backend';
|
||||
import config from '../Config';
|
||||
|
||||
const logger = new Logger('MultipleBackendGateway', {
|
||||
logLevel: config.log.logLevel,
|
||||
dumpLevel: config.log.dumpLevel,
|
||||
});
|
||||
|
||||
function createLogger(reqUids) {
|
||||
return reqUids ?
|
||||
logger.newRequestLoggerFromSerializedUids(reqUids) :
|
||||
logger.newRequestLogger();
|
||||
}
|
||||
|
||||
function _createAwsKey(requestBucketName, requestObjectKey,
|
||||
partNumber, uploadId) {
|
||||
const unique = UUID.v4();
|
||||
// TODO: Discuss how we want to generate keys. Not having unique feature
|
||||
// is too dangerous since we could have cleanup deletes on a key being
|
||||
// called after a new object was created
|
||||
return `${requestBucketName}/uploadId-${uploadId}/` +
|
||||
`partNumber-${partNumber}/${requestObjectKey}/${unique}`;
|
||||
}
|
||||
|
||||
const clients = {};
|
||||
Object.keys(config.locationConstraints).forEach(location => {
|
||||
const locationObj = config.locationConstraints[location];
|
||||
if (locationObj.type === 'mem') {
|
||||
clients[location] = inMemory;
|
||||
}
|
||||
if (locationObj.type === 'file') {
|
||||
clients[location] = file;
|
||||
}
|
||||
if (locationObj.type === 'scality_s3'
|
||||
&& locationObj.information.connector === 'sproxyd') {
|
||||
clients[location] = new Sproxy({
|
||||
bootstrap: locationObj.information.connector
|
||||
.sproxyd.bootstrap,
|
||||
log: config.log,
|
||||
// Might be undefined which is ok since there is a default
|
||||
// set in sproxydclient if chordCos is undefined
|
||||
chordCos: locationObj.information.connector.sproxyd.chordCos,
|
||||
});
|
||||
}
|
||||
if (locationObj.type === 'aws_s3') {
|
||||
clients[location] = new AWS.S3({
|
||||
endpoint: `https://${locationObj.information.endpoint}`,
|
||||
// Non-file stream objects are not supported with SigV4 (node sdk)
|
||||
// so note that we are using the default of signatureVersion v2
|
||||
|
||||
// consider disabling
|
||||
debug: true,
|
||||
// perhaps use this rather than setting ourselves. Not implemented yet for streams in node sdk!!!
|
||||
computeChecksums: true,
|
||||
credentials: new AWS.SharedIniFileCredentials({ profile:
|
||||
locationObj.information.credentialsProfile }),
|
||||
});
|
||||
clients[location].clientType = 'aws_s3';
|
||||
clients[location].awsBucketName = locationObj.information.bucketName;
|
||||
clients[location].dataStoreName = location;
|
||||
}
|
||||
if (locationObj.type === 'virtual-user-metadata') {
|
||||
// TODO
|
||||
// clients[location] = some sort of bucketclient
|
||||
}
|
||||
});
|
||||
|
||||
const multipleBackendGateway = {
|
||||
put: (stream, size, keyContext, backendInfo, reqUids, callback) => {
|
||||
const controllingLocationConstraint =
|
||||
backendInfo.getControllingLocationConstraint();
|
||||
const client = clients[controllingLocationConstraint];
|
||||
if (!client) {
|
||||
const log = createLogger(reqUids);
|
||||
log.error('no data backend matching controlling locationConstraint',
|
||||
{ controllingLocationConstraint });
|
||||
return process.nextTick(() => {
|
||||
callback(errors.InternalError);
|
||||
});
|
||||
}
|
||||
// client is AWS SDK
|
||||
if (client.clientType === 'aws_s3') {
|
||||
const partNumber = keyContext.partNumber || '00000';
|
||||
const uploadId = keyContext.uploadId || '00000';
|
||||
const awsKey = _createAwsKey(keyContext.bucketName,
|
||||
keyContext.objectKey, partNumber, uploadId);
|
||||
return client.putObject({
|
||||
Bucket: client.awsBucketName,
|
||||
Key: awsKey,
|
||||
Body: stream,
|
||||
ContentLength: size,
|
||||
//Must fix!!! Use this or see if computeChecksums handles it
|
||||
//for us
|
||||
// TODO: This should be in listener to make sure
|
||||
// we have the completedHash. Also, if we pre-encrypt,
|
||||
// this will not work. Need to get hash of encrypted version.
|
||||
// Sending ContentMD5 is needed so that AWS will check to
|
||||
// make sure it is receiving the correct data.
|
||||
// ContentMD5: stream.completedHash,
|
||||
},
|
||||
(err, data) => {
|
||||
if (err) {
|
||||
const log = createLogger(reqUids);
|
||||
log.error('err from data backend',
|
||||
{ err, dataStoreName: client.dataStoreName });
|
||||
// TODO: consider passing through error
|
||||
// rather than translating though could be confusing
|
||||
// (e.g., NoSuchBucket error when request was
|
||||
// actually made to the Scality s3 bucket name)
|
||||
return callback(errors.InternalError);
|
||||
}
|
||||
const dataRetrievalInfo = {
|
||||
key: awsKey,
|
||||
dataStoreName: client.dataStoreName,
|
||||
// because of encryption the ETag here could be
|
||||
// different from our metadata so let's store it
|
||||
dataStoreETag: data.ETag,
|
||||
};
|
||||
return callback(null, dataRetrievalInfo);
|
||||
});
|
||||
}
|
||||
return client.put(stream, size, keyContext,
|
||||
reqUids, callback);
|
||||
},
|
||||
|
||||
get: (objectGetInfo, range, reqUids, callback) => {
|
||||
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
|
||||
const client = clients[objectGetInfo.dataStoreName];
|
||||
if (client.clientType === 'aws_s3') {
|
||||
return callback(null, client.getObject({
|
||||
Bucket: client.awsBucketName,
|
||||
Key: key,
|
||||
Range: range,
|
||||
}).createReadStream());
|
||||
}
|
||||
return client.get(objectGetInfo, range, reqUids, callback);
|
||||
},
|
||||
|
||||
delete: (objectGetInfo, reqUids, callback) => {
|
||||
const key = objectGetInfo.key ? objectGetInfo.key : objectGetInfo;
|
||||
const client = clients[objectGetInfo.dataStoreName];
|
||||
if (client.clientType === 'aws_s3') {
|
||||
return client.deleteObject({
|
||||
Bucket: client.awsBucketName,
|
||||
Key: key,
|
||||
});
|
||||
}
|
||||
return client.delete(objectGetInfo, reqUids, callback);
|
||||
},
|
||||
|
||||
// checkHealth: to aggregate from multiple?
|
||||
};
|
||||
|
||||
export default multipleBackendGateway;
|
||||
|
||||
// DO WRAPPER STUFF BASED ON REQUEST/STORED INFO
|
||||
//
|
||||
// For GETS and DELETES use objectGetInfo implName
|
||||
|
||||
// For PUTS:
|
||||
// 1) check x-amz-meta-scal-location-constraint on put/copy
|
||||
// 2) bucket location constraint
|
||||
// 3) default for endpoint hit.
|
|
@ -1,8 +1,10 @@
|
|||
import async from 'async';
|
||||
import { errors } from 'arsenal';
|
||||
import Sproxy from 'sproxydclient';
|
||||
|
||||
import file from './file/backend';
|
||||
import inMemory from './in_memory/backend';
|
||||
import multipleBackendGateway from './multipleBackendGateway';
|
||||
import config from '../Config';
|
||||
import MD5Sum from '../utilities/MD5Sum';
|
||||
import assert from 'assert';
|
||||
|
@ -24,11 +26,15 @@ if (config.backends.data === 'mem') {
|
|||
chordCos: config.sproxyd.chordCos,
|
||||
});
|
||||
implName = 'sproxyd';
|
||||
} else if (config.backends.data === 'multiple') {
|
||||
client = multipleBackendGateway;
|
||||
implName = 'multipleBackends';
|
||||
}
|
||||
|
||||
/**
|
||||
* _retryDelete - Attempt to delete key again if it failed previously
|
||||
* @param {string} key - location of the object to delete
|
||||
* @param { string | object } objectGetInfo - either string location of object
|
||||
* to delete or object containing info of object to delete
|
||||
* @param {object} log - Werelogs request logger
|
||||
* @param {number} count - keeps count of number of times function has been run
|
||||
* @param {function} cb - callback
|
||||
|
@ -36,20 +42,20 @@ if (config.backends.data === 'mem') {
|
|||
*/
|
||||
const MAX_RETRY = 2;
|
||||
|
||||
function _retryDelete(key, log, count, cb) {
|
||||
function _retryDelete(objectGetInfo, log, count, cb) {
|
||||
if (count > MAX_RETRY) {
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return client.delete(key, log.getSerializedUids(), err => {
|
||||
return client.delete(objectGetInfo, log.getSerializedUids(), err => {
|
||||
if (err) {
|
||||
return _retryDelete(key, log, count + 1, cb);
|
||||
return _retryDelete(objectGetInfo, log, count + 1, cb);
|
||||
}
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
||||
const data = {
|
||||
put: (cipherBundle, value, valueSize, keyContext, log, cb) => {
|
||||
put: (cipherBundle, value, valueSize, keyContext, backendInfo, log, cb) => {
|
||||
assert.strictEqual(typeof valueSize, 'number');
|
||||
log.debug('sending put to datastore', { implName, keyContext,
|
||||
method: 'put' });
|
||||
|
@ -61,20 +67,33 @@ const data = {
|
|||
writeStream = cipherBundle.cipher;
|
||||
hashedStream.pipe(writeStream);
|
||||
}
|
||||
|
||||
client.put(writeStream, valueSize, keyContext, log.getSerializedUids(),
|
||||
(err, key) => {
|
||||
if (err) {
|
||||
log.error('error from datastore',
|
||||
if (implName === 'multipleBackends') {
|
||||
// Need to send backendInfo to client.put and
|
||||
// client.put will provide dataRetrievalInfo so no
|
||||
// need to construct here
|
||||
return client.put(writeStream, valueSize, keyContext, backendInfo,
|
||||
log.getSerializedUids(), (err, dataRetrievalInfo) => {
|
||||
if (err) {
|
||||
log.error('error from datastore',
|
||||
{ error: err, implName });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
return cb(null, dataRetrievalInfo, hashedStream);
|
||||
});
|
||||
}
|
||||
return client.put(writeStream, valueSize, keyContext,
|
||||
log.getSerializedUids(), (err, key) => {
|
||||
if (err) {
|
||||
log.error('error from datastore',
|
||||
{ error: err, implName });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
const dataRetrievalInfo = {
|
||||
key,
|
||||
dataStoreName: implName,
|
||||
};
|
||||
return cb(null, dataRetrievalInfo, hashedStream);
|
||||
});
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
const dataRetrievalInfo = {
|
||||
key,
|
||||
dataStoreName: implName,
|
||||
};
|
||||
return cb(null, dataRetrievalInfo, hashedStream);
|
||||
});
|
||||
},
|
||||
|
||||
get: (objectGetInfo, log, cb) => {
|
||||
|
@ -84,34 +103,37 @@ const data = {
|
|||
const range = objectGetInfo.range;
|
||||
log.debug('sending get to datastore', { implName, key,
|
||||
range, method: 'get' });
|
||||
client.get(key, range, log.getSerializedUids(), (err, stream) => {
|
||||
if (err) {
|
||||
log.error('error from sproxyd', { error: err });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (objectGetInfo.cipheredDataKey) {
|
||||
const serverSideEncryption = {
|
||||
cryptoScheme: objectGetInfo.cryptoScheme,
|
||||
masterKeyId: objectGetInfo.masterKeyId,
|
||||
cipheredDataKey: Buffer.from(objectGetInfo.cipheredDataKey,
|
||||
'base64'),
|
||||
};
|
||||
const offset = objectGetInfo.range ? objectGetInfo.range[0] : 0;
|
||||
return kms.createDecipherBundle(
|
||||
serverSideEncryption, offset, log,
|
||||
(err, decipherBundle) => {
|
||||
if (err) {
|
||||
log.error('cannot get decipher bundle from kms', {
|
||||
method: 'data.wrapper.data.get',
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
stream.pipe(decipherBundle.decipher);
|
||||
return cb(null, decipherBundle.decipher);
|
||||
});
|
||||
}
|
||||
return cb(null, stream);
|
||||
});
|
||||
client.get(objectGetInfo, range, log.getSerializedUids(),
|
||||
(err, stream) => {
|
||||
if (err) {
|
||||
log.error('error from datastore', { error: err, implName });
|
||||
return cb(errors.InternalError);
|
||||
}
|
||||
if (objectGetInfo.cipheredDataKey) {
|
||||
const serverSideEncryption = {
|
||||
cryptoScheme: objectGetInfo.cryptoScheme,
|
||||
masterKeyId: objectGetInfo.masterKeyId,
|
||||
cipheredDataKey: Buffer.from(
|
||||
objectGetInfo.cipheredDataKey, 'base64'),
|
||||
};
|
||||
const offset = objectGetInfo.range ?
|
||||
objectGetInfo.range[0] : 0;
|
||||
return kms.createDecipherBundle(
|
||||
serverSideEncryption, offset, log,
|
||||
(err, decipherBundle) => {
|
||||
if (err) {
|
||||
log.error('cannot get decipher bundle ' +
|
||||
'from kms', {
|
||||
method: 'data.wrapper.data.get',
|
||||
});
|
||||
return cb(err);
|
||||
}
|
||||
stream.pipe(decipherBundle.decipher);
|
||||
return cb(null, decipherBundle.decipher);
|
||||
});
|
||||
}
|
||||
return cb(null, stream);
|
||||
});
|
||||
},
|
||||
|
||||
delete: (objectGetInfo, log, cb) => {
|
||||
|
@ -124,7 +146,7 @@ const data = {
|
|||
key,
|
||||
method: 'delete',
|
||||
});
|
||||
_retryDelete(key, log, 0, err => {
|
||||
_retryDelete(objectGetInfo, log, 0, err => {
|
||||
if (err) {
|
||||
log.error('error deleting object from datastore',
|
||||
{ error: err, key });
|
||||
|
@ -170,6 +192,7 @@ const data = {
|
|||
return cb(null, defResp);
|
||||
}
|
||||
return client.healthcheck(log, (err, result) => {
|
||||
//NEED TO UPDATE FOR POSSIBILITY OF MULTIPLE HEALTHCHECKS HERE!!
|
||||
const respBody = {};
|
||||
if (err) {
|
||||
log.error(`error from ${implName}`, { error: err });
|
||||
|
|
15
lib/utils.js
15
lib/utils.js
|
@ -72,7 +72,11 @@ utils.getAllRegions = function getAllRegions() {
|
|||
'ap-northeast-1', 'ap-southeast-1', 'ap-southeast-2', 'eu-central-1',
|
||||
'eu-west-1', 'sa-east-1', 'us-east-1', 'us-west-1', 'us-west-2',
|
||||
'us-gov-west-1'];
|
||||
return Object.keys(config.regions).concat(awsOfficialRegions);
|
||||
if (config.regions !== undefined) {
|
||||
return Object.keys(config.regions).concat(awsOfficialRegions);
|
||||
}
|
||||
return Object.keys(config.restEndpoints).map(e =>
|
||||
config.restEndpoints[e]).concat(awsOfficialRegions);
|
||||
};
|
||||
|
||||
/**
|
||||
|
@ -81,9 +85,12 @@ utils.getAllRegions = function getAllRegions() {
|
|||
* @returns {string[]} - list of valid endpoints
|
||||
*/
|
||||
utils.getAllEndpoints = function getAllEndpoints() {
|
||||
return Object.keys(config.regions)
|
||||
.map(r => config.regions[r])
|
||||
.reduce((a, b) => a.concat(b));
|
||||
if (config.regions !== undefined) {
|
||||
return Object.keys(config.regions)
|
||||
.map(r => config.regions[r])
|
||||
.reduce((a, b) => a.concat(b));
|
||||
}
|
||||
return Object.keys(config.restEndpoints);
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
},
|
||||
"homepage": "https://github.com/scality/S3#readme",
|
||||
"dependencies": {
|
||||
"aws-sdk": "^2.2.11",
|
||||
"arsenal": "scality/Arsenal",
|
||||
"async": "~1.4.2",
|
||||
"babel-core": "^6.5.2",
|
||||
|
@ -33,7 +34,7 @@
|
|||
"multilevel": "^7.3.0",
|
||||
"node-uuid": "^1.4.3",
|
||||
"ready-set-stream": "1.0.7",
|
||||
"sproxydclient": "scality/sproxydclient",
|
||||
"sproxydclient": "scality/sproxydclient#ft/multipleBackendGet",
|
||||
"utapi": "scality/utapi",
|
||||
"utf8": "~2.1.1",
|
||||
"vaultclient": "scality/vaultclient",
|
||||
|
@ -44,7 +45,6 @@
|
|||
"ioctl": "2.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"aws-sdk": "^2.2.11",
|
||||
"babel-cli": "^6.2.0",
|
||||
"babel-eslint": "^6.0.0",
|
||||
"bluebird": "^3.3.1",
|
||||
|
|
|
@ -12,7 +12,7 @@ describe('utils.getBucketNameFromHost', () => {
|
|||
'buck.et', 'bu.ck.et', 'bu.ck-et',
|
||||
].forEach(bucket => {
|
||||
const headers = {
|
||||
host: `${bucket}.s3.eu-west-1.amazonaws.com`,
|
||||
host: `${bucket}.s3.amazonaws.com`,
|
||||
};
|
||||
const result = utils.getBucketNameFromHost({ headers });
|
||||
assert.strictEqual(result, bucket);
|
||||
|
@ -36,12 +36,6 @@ describe('utils.getBucketNameFromHost', () => {
|
|||
it('should return undefined when non dns-style', () => {
|
||||
[
|
||||
's3.amazonaws.com',
|
||||
's3.eu.central-1.amazonaws.com',
|
||||
's3.eu-west-1.amazonaws.com',
|
||||
's3-external-1.amazonaws.com',
|
||||
's3.us-east-1.amazonaws.com',
|
||||
's3-us-gov-west-1.amazonaws.com',
|
||||
's3-fips-us-gov-west-1.amazonaws.com',
|
||||
].forEach(host => {
|
||||
const headers = { host };
|
||||
const result = utils.getBucketNameFromHost({ headers });
|
||||
|
@ -97,7 +91,9 @@ describe('utils.getAllRegions', () => {
|
|||
it('should return regions from config', () => {
|
||||
const allRegions = utils.getAllRegions();
|
||||
|
||||
assert(allRegions.indexOf('localregion') >= 0);
|
||||
assert(allRegions.indexOf('scality-us-east-1') >= 0);
|
||||
assert(allRegions.indexOf('scality-us-west-1') >= 0);
|
||||
assert(allRegions.indexOf('aws-us-east-1') >= 0);
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -105,11 +101,10 @@ describe('utils.getAllEndpoints', () => {
|
|||
it('should return endpoints from config', () => {
|
||||
const allEndpoints = utils.getAllEndpoints();
|
||||
|
||||
assert(allEndpoints.indexOf('s3-us-west-2.amazonaws.com') >= 0);
|
||||
assert(allEndpoints.indexOf('127.0.0.1') >= 0);
|
||||
assert(allEndpoints.indexOf('s3.docker.test') >= 0);
|
||||
assert(allEndpoints.indexOf('127.0.0.2') >= 0);
|
||||
assert(allEndpoints.indexOf('s3.amazonaws.com') >= 0);
|
||||
assert(allEndpoints.indexOf('s3-external-1.amazonaws.com') >= 0);
|
||||
assert(allEndpoints.indexOf('s3.us-east-1.amazonaws.com') >= 0);
|
||||
assert(allEndpoints.indexOf('localhost') >= 0);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
Loading…
Reference in New Issue