Compare commits

...

5 Commits

Author SHA1 Message Date
Dora Korpar 48d766a313 [squash] gcputils 2019-01-10 15:39:53 -08:00
Dora Korpar 72144d77c2 [test] increase timeout and add writes 2019-01-10 14:47:26 -08:00
Dora Korpar 5415bcffd1 ft: ZENKO 1402 data backend tests 2019-01-09 17:28:40 -08:00
Dora Korpar e73de18b21 update Arsenal version 2019-01-09 13:47:38 -08:00
Dora Korpar 8e0617f946 ft: ZENKO 1402 move data backends to arsenal 2019-01-09 10:38:32 -08:00
62 changed files with 88 additions and 5327 deletions

View File

@ -1,242 +0,0 @@
const { errors } = require('arsenal');
const crypto = require('crypto');
const constants = require('../../../../constants');
/**
* createAggregateETag - creates ETag from concatenated MPU part ETags to
* mimic AWS
* @param {string} concatETags - string of concatenated MPU part ETags
* @param {array} partList - list of parts to complete MPU with
* @return {string} aggregateETag - final complete MPU obj ETag
*/
function createAggregateETag(concatETags, partList) {
// AWS documentation is unclear on what the MD5 is that it returns
// in the response for a complete multipart upload request.
// The docs state that they might or might not
// return the MD5 of the complete object. It appears
// they are returning the MD5 of the parts' MD5s so that is
// what we have done here. We:
// 1) concatenate the hex version of the
// individual ETags
// 2) convert the concatenated hex to binary
// 3) take the md5 of the binary
// 4) create the hex digest of the md5
// 5) add '-' plus the number of parts at the end
// Convert the concatenated hex ETags to binary
const bufferedHex = Buffer.from(concatETags, 'hex');
// Convert the buffer to a binary string
const binaryString = bufferedHex.toString('binary');
// Get the md5 of the binary string
const md5Hash = crypto.createHash('md5');
md5Hash.update(binaryString, 'binary');
// Get the hex digest of the md5
let aggregateETag = md5Hash.digest('hex');
// Add the number of parts at the end
aggregateETag = `${aggregateETag}-${partList.length}`;
return aggregateETag;
}
/**
* generateMpuPartStorageInfo - generates info needed for storage of
* completed MPU object
* @param {array} filteredPartList - list of parts filtered from metadata
* @return {object} partsInfo - contains three keys: aggregateETag,
* dataLocations, and calculatedSize
*/
function generateMpuPartStorageInfo(filteredPartList) {
// Assemble array of part locations, aggregate size
// and build string to create aggregate ETag
let calculatedSize = 0;
const dataLocations = [];
let concatETags = '';
const partsInfo = {};
filteredPartList.forEach((storedPart, index) => {
const partETagWithoutQuotes =
storedPart.ETag.slice(1, -1);
const dataStoreETag = `${index + 1}:${partETagWithoutQuotes}`;
concatETags += partETagWithoutQuotes;
// If part was put by a regular put part rather than a
// copy it is always one location. With a put part
// copy, could be multiple locations so loop over array
// of locations.
for (let j = 0; j < storedPart.locations.length; j++) {
// If the piece has parts (was a put part object
// copy) each piece will have a size attribute.
// Otherwise, the piece was put by a regular put
// part and the size the of the piece is the full
// part size.
const location = storedPart.locations[j];
// If there is no location, move on
if (!location || typeof location !== 'object') {
continue;
}
let pieceSize = Number.parseInt(storedPart.size, 10);
if (location.size) {
pieceSize = Number.parseInt(location.size, 10);
}
const pieceRetrievalInfo = {
key: location.key,
size: pieceSize,
start: calculatedSize,
dataStoreName: location.dataStoreName,
dataStoreETag,
cryptoScheme: location.sseCryptoScheme,
cipheredDataKey: location.sseCipheredDataKey,
};
dataLocations.push(pieceRetrievalInfo);
// eslint-disable-next-line no-param-reassign
calculatedSize += pieceSize;
}
});
partsInfo.aggregateETag =
createAggregateETag(concatETags, filteredPartList);
partsInfo.dataLocations = dataLocations;
partsInfo.calculatedSize = calculatedSize;
return partsInfo;
}
/**
* validateAndFilterMpuParts - validates part list sent by user and filters
* parts stored in metadata against user part list
* @param {array} storedParts - array of parts stored in metadata
* @param {array} jsonList - array of parts sent by user for completion
* @param {string} mpuOverviewKey - metadata mpu key
* @param {string} splitter - mpu key divider
* @param {object} log - Werelogs instance
* @return {object} filtersPartsObj - contains 3 keys: partList, keysToDelete,
* and extraPartLocations
*/
function validateAndFilterMpuParts(storedParts, jsonList, mpuOverviewKey,
splitter, log) {
let storedPartsCopy = [];
const filteredPartsObj = {};
filteredPartsObj.partList = [];
const keysToDelete = [];
storedParts.forEach(item => {
keysToDelete.push(item.key);
storedPartsCopy.push({
// In order to delete the part listing in the shadow
// bucket, need the full key
key: item.key,
ETag: `"${item.value.ETag}"`,
size: item.value.Size,
locations: Array.isArray(item.value.partLocations) ?
item.value.partLocations : [item.value.partLocations],
});
});
keysToDelete.push(mpuOverviewKey);
// Check list sent to make sure valid
const partLength = jsonList.Part.length;
// A user can put more parts than they end up including
// in the completed MPU but there cannot be more
// parts in the complete message than were already put
if (partLength > storedPartsCopy.length) {
filteredPartsObj.error = errors.InvalidPart;
return filteredPartsObj;
}
let extraParts = [];
const extraPartLocations = [];
for (let i = 0; i < partLength; i++) {
const part = jsonList.Part[i];
const partNumber = Number.parseInt(part.PartNumber[0], 10);
// If the complete list of parts sent with
// the complete multipart upload request is not
// in ascending order return an error
if (i > 0) {
const previousPartNumber =
Number.parseInt(jsonList.Part[i - 1].PartNumber[0], 10);
if (partNumber <= previousPartNumber) {
filteredPartsObj.error = errors.InvalidPartOrder;
return filteredPartsObj;
}
}
let isPartUploaded = false;
while (storedPartsCopy.length > 0 && !isPartUploaded) {
const storedPart = storedPartsCopy[0];
const storedPartNumber =
Number.parseInt(storedPart.key.split(splitter)[1], 10);
if (storedPartNumber === partNumber) {
isPartUploaded = true;
filteredPartsObj.partList.push(storedPart);
let partETag = part.ETag[0].replace(/['"]/g, '');
// some clients send base64, convert to hex
// 32 chars = 16 bytes(2 chars-per-byte) = 128 bits of
// MD5 hex
if (partETag.length !== 32) {
const buffered = Buffer.from(part.ETag[0], 'base64')
.toString('hex');
partETag = `${buffered}`;
}
partETag = `"${partETag}"`;
// If list of parts sent with complete mpu request contains
// a part ETag that does not match the ETag for the part
// stored in metadata, return an error
if (partETag !== storedPart.ETag) {
filteredPartsObj.error = errors.InvalidPart;
return filteredPartsObj;
}
// If any part other than the last part is less than
// 5MB, return an error
const storedPartSize =
Number.parseInt(storedPart.size, 10);
// allow smaller parts for testing
if (process.env.MPU_TESTING) {
log.info('MPU_TESTING env variable setting',
{ setting: process.env.MPU_TESTING });
}
if (process.env.MPU_TESTING !== 'yes' &&
i < jsonList.Part.length - 1 &&
storedPartSize < constants.minimumAllowedPartSize) {
log.debug('part too small on complete mpu');
filteredPartsObj.error = errors.EntityTooSmall;
return filteredPartsObj;
}
storedPartsCopy = storedPartsCopy.splice(1);
} else {
extraParts.push(storedPart);
storedPartsCopy = storedPartsCopy.splice(1);
}
}
if (!isPartUploaded) {
filteredPartsObj.error = errors.InvalidPart;
return filteredPartsObj;
}
}
extraParts = extraParts.concat(storedPartsCopy);
// if extra parts, need to delete the data when done with completing
// mpu so extract the info to delete here
if (extraParts.length > 0) {
extraParts.forEach(part => {
const locations = part.locations;
locations.forEach(location => {
if (!location || typeof location !== 'object') {
return;
}
extraPartLocations.push(location);
});
});
}
filteredPartsObj.keysToDelete = keysToDelete;
filteredPartsObj.extraPartLocations = extraPartLocations;
return filteredPartsObj;
}
module.exports = {
generateMpuPartStorageInfo,
validateAndFilterMpuParts,
createAggregateETag,
};

View File

@ -1,12 +1,11 @@
const async = require('async');
const arsenal = require('arsenal');
const { parseString } = require('xml2js');
const { errors, versioning, s3middleware } = require('arsenal');
const convertToXml = s3middleware.convertToXml;
const { pushMetric } = require('../utapi/utilities');
const getReplicationInfo = require('./apiUtils/object/getReplicationInfo');
const { validateAndFilterMpuParts, generateMpuPartStorageInfo } =
require('./apiUtils/object/processMpuParts');
const { config } = require('../Config');
const multipleBackendGateway = require('../data/multipleBackendGateway');
@ -18,9 +17,11 @@ const { versioningPreprocessing, checkQueryVersionId }
const metadata = require('../metadata/wrapper');
const services = require('../services');
const { metadataValidateBucketAndObj } = require('../metadata/metadataUtils');
const { skipMpuPartProcessing } = require('../data/external/utils');
const { skipMpuPartProcessing } = arsenal.storage.data.external.backendUtils;
const locationConstraintCheck
= require('./apiUtils/object/locationConstraintCheck');
const { validateAndFilterMpuParts, generateMpuPartStorageInfo } =
s3middleware.processMpuParts;
const logger = require('../utilities/logger');

View File

@ -1,621 +0,0 @@
const AWS = require('aws-sdk');
const { errors, s3middleware } = require('arsenal');
const werelogs = require('werelogs');
const MD5Sum = s3middleware.MD5Sum;
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
const createLogger = require('../multipleBackendLogger');
const { prepareStream } = require('../../api/apiUtils/object/prepareStream');
const { logHelper, removeQuotes, trimXMetaPrefix } = require('./utils');
const { config } = require('../../Config');
const missingVerIdInternalError = errors.InternalError.customizeDescription(
'Invalid state. Please ensure versioning is enabled ' +
'in AWS for the location constraint and try again.'
);
class AwsClient {
constructor(config) {
this.clientType = 'aws_s3';
this.type = 'AWS';
this._s3Params = config.s3Params;
this._awsBucketName = config.bucketName;
this._bucketMatch = config.bucketMatch;
this._dataStoreName = config.dataStoreName;
this._serverSideEncryption = config.serverSideEncryption;
this._supportsVersioning = config.supportsVersioning;
this._client = new AWS.S3(this._s3Params);
this._logger = new werelogs.Logger('AwsClient');
}
setup(cb) {
// this request implicitly updates the endpoint for the location
// the following code explcitly sets it to avoid surprises
this._client.getBucketLocation({ Bucket: this._awsBucketName },
(err, res) => {
if (err && err.code !== 'AuthorizationHeaderMalformed') {
this._logger.error('error during setup', {
error: err,
method: 'AwsClient.setup',
});
return cb(err);
}
let region;
if (err && err.code === 'AuthorizationHeaderMalformed') {
// set regional endpoint
region = err.region;
} else if (res) {
region = res.LocationConstraint;
}
const isAWS = this._s3Params.endpoint.endsWith('amazonaws.com');
if (region && isAWS) {
const endpoint = `s3.${region}.amazonaws.com`;
this._logger.debug('setting regional endpoint', {
method: 'AwsClient.setup',
region,
endpoint,
});
this._client.endpoint = new AWS.Endpoint(endpoint);
}
return cb();
});
}
_createAwsKey(requestBucketName, requestObjectKey,
bucketMatch) {
if (bucketMatch) {
return requestObjectKey;
}
return `${requestBucketName}/${requestObjectKey}`;
}
toObjectGetInfo(objectKey, bucketName) {
return {
key: this._createAwsKey(bucketName, objectKey, this._bucketMatch),
dataStoreName: this._dataStoreName,
};
}
put(stream, size, keyContext, reqUids, callback) {
const awsKey = this._createAwsKey(keyContext.bucketName,
keyContext.objectKey, this._bucketMatch);
const metaHeaders = trimXMetaPrefix(keyContext.metaHeaders);
const log = createLogger(reqUids);
const putCb = (err, data) => {
if (err) {
logHelper(log, 'error', 'err from data backend',
err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
if (!data.VersionId && this._supportsVersioning) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName, this.clientType);
return callback(missingVerIdInternalError);
}
const dataStoreVersionId = data.VersionId;
return callback(null, awsKey, dataStoreVersionId);
};
const params = {
Bucket: this._awsBucketName,
Key: awsKey,
};
// we call data.put to create a delete marker, but it's actually a
// delete request in call to AWS
if (keyContext.isDeleteMarker) {
return this._client.deleteObject(params, putCb);
}
const uploadParams = params;
uploadParams.Metadata = metaHeaders;
uploadParams.ContentLength = size;
if (this._serverSideEncryption) {
uploadParams.ServerSideEncryption = 'AES256';
}
if (keyContext.tagging) {
uploadParams.Tagging = keyContext.tagging;
}
if (keyContext.contentType !== undefined) {
uploadParams.ContentType = keyContext.contentType;
}
if (keyContext.cacheControl !== undefined) {
uploadParams.CacheControl = keyContext.cacheControl;
}
if (keyContext.contentDisposition !== undefined) {
uploadParams.ContentDisposition = keyContext.contentDisposition;
}
if (keyContext.contentEncoding !== undefined) {
uploadParams.ContentEncoding = keyContext.contentEncoding;
}
if (!stream) {
return this._client.putObject(uploadParams, putCb);
}
uploadParams.Body = stream;
return this._client.upload(uploadParams, putCb);
}
head(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const { key, dataStoreVersionId } = objectGetInfo;
return this._client.headObject({
Bucket: this._awsBucketName,
Key: key,
VersionId: dataStoreVersionId,
}, err => {
if (err) {
logHelper(log, 'error', 'error heading object ' +
'from datastore', err, this._dataStoreName);
if (err.code === 'NotFound') {
const error = errors.ServiceUnavailable
.customizeDescription(
`Unexpected error from ${this.type}: ` +
`"NotFound". Data on ${this.type} ` +
'may have been altered outside of CloudServer.'
);
return callback(error);
}
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
return callback();
});
}
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
const { key, dataStoreVersionId } = objectGetInfo;
const request = this._client.getObject({
Bucket: this._awsBucketName,
Key: key,
VersionId: dataStoreVersionId,
Range: range ? `bytes=${range[0]}-${range[1]}` : null,
}).on('success', response => {
log.trace(`${this.type} GET request response headers`,
{ responseHeaders: response.httpResponse.headers,
backendType: this.clientType });
});
const stream = request.createReadStream().on('error', err => {
logHelper(log, 'error',
`error streaming data from ${this.type}`,
err, this._dataStoreName, this.clientType);
return callback(err);
});
return callback(null, stream);
}
delete(objectGetInfo, reqUids, callback) {
const { key, dataStoreVersionId, deleteVersion } = objectGetInfo;
const log = createLogger(reqUids);
const params = {
Bucket: this._awsBucketName,
Key: key,
};
if (deleteVersion) {
params.VersionId = dataStoreVersionId;
}
return this._client.deleteObject(params, err => {
if (err) {
logHelper(log, 'error', 'error deleting object from ' +
'datastore', err, this._dataStoreName, this.clientType);
if (err.code === 'NoSuchVersion' || err.code === 'NoSuchKey') {
// data may have been deleted directly from the AWS backend
// don't want to retry the delete and errors are not
// sent back to client anyway, so no need to return err
return callback();
}
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
return callback();
});
}
healthcheck(location, callback) {
const awsResp = {};
this._client.headBucket({ Bucket: this._awsBucketName },
err => {
/* eslint-disable no-param-reassign */
if (err) {
awsResp[location] = { error: err, external: true };
return callback(null, awsResp);
}
if (!this._supportsVersioning) {
awsResp[location] = {
message: 'Congrats! You own the bucket',
};
return callback(null, awsResp);
}
return this._client.getBucketVersioning({
Bucket: this._awsBucketName },
(err, data) => {
if (err) {
awsResp[location] = { error: err, external: true };
} else if (!data.Status ||
data.Status === 'Suspended') {
awsResp[location] = {
versioningStatus: data.Status,
error: 'Versioning must be enabled',
external: true,
};
} else {
awsResp[location] = {
versioningStatus: data.Status,
message: 'Congrats! You own the bucket',
};
}
return callback(null, awsResp);
});
});
}
createMPU(key, metaHeaders, bucketName, websiteRedirectHeader, contentType,
cacheControl, contentDisposition, contentEncoding, log, callback) {
const metaHeadersTrimmed = {};
Object.keys(metaHeaders).forEach(header => {
if (header.startsWith('x-amz-meta-')) {
const headerKey = header.substring(11);
metaHeadersTrimmed[headerKey] = metaHeaders[header];
}
});
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const params = {
Bucket: awsBucket,
Key: awsKey,
WebsiteRedirectLocation: websiteRedirectHeader,
Metadata: metaHeadersTrimmed,
ContentType: contentType,
CacheControl: cacheControl,
ContentDisposition: contentDisposition,
ContentEncoding: contentEncoding,
};
return this._client.createMultipartUpload(params, (err, mpuResObj) => {
if (err) {
logHelper(log, 'error', 'err from data backend',
err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
return callback(null, mpuResObj);
});
}
uploadPart(request, streamingV4Params, stream, size, key, uploadId,
partNumber, bucketName, log, callback) {
let hashedStream = stream;
if (request) {
const partStream = prepareStream(request, streamingV4Params,
log, callback);
hashedStream = new MD5Sum();
partStream.pipe(hashedStream);
}
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const params = { Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
Body: hashedStream, ContentLength: size,
PartNumber: partNumber };
return this._client.uploadPart(params, (err, partResObj) => {
if (err) {
logHelper(log, 'error', 'err from data backend ' +
'on uploadPart', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
// Because we manually add quotes to ETag later, remove quotes here
const noQuotesETag =
partResObj.ETag.substring(1, partResObj.ETag.length - 1);
const dataRetrievalInfo = {
key: awsKey,
dataStoreType: 'aws_s3',
dataStoreName: this._dataStoreName,
dataStoreETag: noQuotesETag,
};
return callback(null, dataRetrievalInfo);
});
}
listParts(key, uploadId, bucketName, partNumberMarker, maxParts, log,
callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const params = { Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
PartNumberMarker: partNumberMarker, MaxParts: maxParts };
return this._client.listParts(params, (err, partList) => {
if (err) {
logHelper(log, 'error', 'err from data backend on listPart',
err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
// build storedParts object to mimic Scality S3 backend returns
const storedParts = {};
storedParts.IsTruncated = partList.IsTruncated;
storedParts.Contents = [];
storedParts.Contents = partList.Parts.map(item => {
// We manually add quotes to ETag later, so remove quotes here
const noQuotesETag =
item.ETag.substring(1, item.ETag.length - 1);
return {
partNumber: item.PartNumber,
value: {
Size: item.Size,
ETag: noQuotesETag,
LastModified: item.LastModified,
},
};
});
return callback(null, storedParts);
});
}
/**
* completeMPU - complete multipart upload on AWS backend
* @param {object} jsonList - user-sent list of parts to include in
* final mpu object
* @param {object} mdInfo - object containing 3 keys: storedParts,
* mpuOverviewKey, and splitter
* @param {string} key - object key
* @param {string} uploadId - multipart upload id string
* @param {string} bucketName - name of bucket
* @param {RequestLogger} log - logger instance
* @param {function} callback - callback function
* @return {(Error|object)} - return Error if complete MPU fails, otherwise
* object containing completed object key, eTag, and contentLength
*/
completeMPU(jsonList, mdInfo, key, uploadId, bucketName, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const mpuError = {
InvalidPart: true,
InvalidPartOrder: true,
EntityTooSmall: true,
};
const partArray = [];
const partList = jsonList.Part;
partList.forEach(partObj => {
const partParams = { PartNumber: partObj.PartNumber[0],
ETag: partObj.ETag[0] };
partArray.push(partParams);
});
const mpuParams = {
Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
MultipartUpload: {
Parts: partArray,
},
};
const completeObjData = { key: awsKey };
return this._client.completeMultipartUpload(mpuParams,
(err, completeMpuRes) => {
if (err) {
if (mpuError[err.code]) {
logHelper(log, 'trace', 'err from data backend on ' +
'completeMPU', err, this._dataStoreName, this.clientType);
return callback(errors[err.code]);
}
logHelper(log, 'error', 'err from data backend on ' +
'completeMPU', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
if (!completeMpuRes.VersionId && this._supportsVersioning) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName, this.clientType);
return callback(missingVerIdInternalError);
}
// need to get content length of new object to store
// in our metadata
return this._client.headObject({ Bucket: awsBucket, Key: awsKey },
(err, objHeaders) => {
if (err) {
logHelper(log, 'trace', 'err from data backend on ' +
'headObject', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
// remove quotes from eTag because they're added later
completeObjData.eTag = completeMpuRes.ETag
.substring(1, completeMpuRes.ETag.length - 1);
completeObjData.dataStoreVersionId = completeMpuRes.VersionId;
completeObjData.contentLength =
Number.parseInt(objHeaders.ContentLength, 10);
return callback(null, completeObjData);
});
});
}
abortMPU(key, uploadId, bucketName, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucketName, key, this._bucketMatch);
const abortParams = {
Bucket: awsBucket, Key: awsKey, UploadId: uploadId,
};
return this._client.abortMultipartUpload(abortParams, err => {
if (err) {
logHelper(log, 'error', 'There was an error aborting ' +
'the MPU on AWS S3. You should abort directly on AWS S3 ' +
'using the same uploadId.', err, this._dataStoreName,
this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
return callback();
});
}
objectPutTagging(key, bucket, objectMD, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucket, key, this._bucketMatch);
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
const tagParams = {
Bucket: awsBucket,
Key: awsKey,
VersionId: dataStoreVersionId,
};
const keyArray = Object.keys(objectMD.tags);
tagParams.Tagging = {};
tagParams.Tagging.TagSet = keyArray.map(key => {
const value = objectMD.tags[key];
return { Key: key, Value: value };
});
return this._client.putObjectTagging(tagParams, err => {
if (err) {
logHelper(log, 'error', 'error from data backend on ' +
'putObjectTagging', err,
this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
return callback();
});
}
objectDeleteTagging(key, bucket, objectMD, log, callback) {
const awsBucket = this._awsBucketName;
const awsKey = this._createAwsKey(bucket, key, this._bucketMatch);
const dataStoreVersionId = objectMD.location[0].dataStoreVersionId;
const tagParams = {
Bucket: awsBucket,
Key: awsKey,
VersionId: dataStoreVersionId,
};
return this._client.deleteObjectTagging(tagParams, err => {
if (err) {
logHelper(log, 'error', 'error from data backend on ' +
'deleteObjectTagging', err,
this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
return callback();
});
}
copyObject(request, destLocationConstraintName, sourceKey,
sourceLocationConstraintName, storeMetadataParams, log, callback) {
const destBucketName = request.bucketName;
const destObjectKey = request.objectKey;
const destAwsKey = this._createAwsKey(destBucketName, destObjectKey,
this._bucketMatch);
const sourceAwsBucketName =
config.getAwsBucketName(sourceLocationConstraintName);
const metadataDirective = request.headers['x-amz-metadata-directive'];
const metaHeaders = trimXMetaPrefix(getMetaHeaders(request.headers));
const awsParams = {
Bucket: this._awsBucketName,
Key: destAwsKey,
CopySource: `${sourceAwsBucketName}/${sourceKey}`,
Metadata: metaHeaders,
MetadataDirective: metadataDirective,
};
if (destLocationConstraintName &&
config.isAWSServerSideEncrytion(destLocationConstraintName)) {
awsParams.ServerSideEncryption = 'AES256';
}
this._client.copyObject(awsParams, (err, copyResult) => {
if (err) {
if (err.code === 'AccessDenied') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceAwsBucketName} ${this.type} bucket`, err,
this._dataStoreName, this.clientType);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceAwsBucketName} ${this.type} bucket`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'copyObject', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
if (!copyResult.VersionId && this._supportsVersioning) {
this._logger.debug('No VersionId found in response, ' +
'calling headObject to resolve');
return this._client.headObject({
Bucket: this._awsBucketName,
Key: destAwsKey,
}, (err, data) => {
if (err || !data.VersionId) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName, this.clientType);
return callback(missingVerIdInternalError);
}
return callback(null, destAwsKey, data.VersionId);
});
}
return callback(null, destAwsKey, copyResult.VersionId);
});
}
uploadPartCopy(request, awsSourceKey, sourceLocationConstraintName,
log, callback) {
const destBucketName = request.bucketName;
const destObjectKey = request.objectKey;
const destAwsKey = this._createAwsKey(destBucketName, destObjectKey,
this._bucketMatch);
const sourceAwsBucketName =
config.getAwsBucketName(sourceLocationConstraintName);
const uploadId = request.query.uploadId;
const partNumber = request.query.partNumber;
const copySourceRange = request.headers['x-amz-copy-source-range'];
const params = {
Bucket: this._awsBucketName,
CopySource: `${sourceAwsBucketName}/${awsSourceKey}`,
CopySourceRange: copySourceRange,
Key: destAwsKey,
PartNumber: partNumber,
UploadId: uploadId,
};
return this._client.uploadPartCopy(params, (err, res) => {
if (err) {
if (err.code === 'AccessDenied') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceAwsBucketName} AWS bucket`, err,
this._dataStoreName, this.clientType);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceAwsBucketName} AWS bucket`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'uploadPartCopy', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`${this.type}: ${err.message}`)
);
}
const eTag = removeQuotes(res.CopyPartResult.ETag);
return callback(null, eTag);
});
}
}
module.exports = AwsClient;

View File

@ -1,449 +0,0 @@
const url = require('url');
const { errors, s3middleware } = require('arsenal');
const azure = require('azure-storage');
const createLogger = require('../multipleBackendLogger');
const { logHelper, translateAzureMetaHeaders } = require('./utils');
const { config } = require('../../Config');
const { validateAndFilterMpuParts } =
require('../../api/apiUtils/object/processMpuParts');
const constants = require('../../../constants');
const metadata = require('../../metadata/wrapper');
const packageVersion = require('../../../package.json').version;
const azureMpuUtils = s3middleware.azureHelper.mpuUtils;
azure.Constants.USER_AGENT_PRODUCT_NAME = constants.productName;
azure.Constants.USER_AGENT_PRODUCT_VERSION = packageVersion;
class AzureClient {
constructor(config) {
this._azureStorageEndpoint = config.azureStorageEndpoint;
this._azureStorageCredentials = config.azureStorageCredentials;
this._azureContainerName = config.azureContainerName;
this._client = azure.createBlobService(
this._azureStorageCredentials.storageAccountName,
this._azureStorageCredentials.storageAccessKey,
this._azureStorageEndpoint);
this._dataStoreName = config.dataStoreName;
this._bucketMatch = config.bucketMatch;
if (config.proxy && config.proxy.url) {
const parsedUrl = url.parse(config.proxy.url);
if (!parsedUrl.port) {
parsedUrl.port = 80;
}
const proxyParams = parsedUrl;
if (config.proxy.certs) {
Object.assign(proxyParams, config.proxy.certs);
}
this._client.setProxy(proxyParams);
}
}
_errorWrapper(s3Method, azureMethod, args, log, cb) {
if (log) {
log.info(`calling azure ${azureMethod}`);
}
try {
this._client[azureMethod].apply(this._client, args);
} catch (err) {
const error = errors.ServiceUnavailable;
if (log) {
log.error('error thrown by Azure Storage Client Library',
{ error: err.message, stack: err.stack, s3Method,
azureMethod, dataStoreName: this._dataStoreName });
}
cb(error.customizeDescription('Error from Azure ' +
`method: ${azureMethod} on ${s3Method} S3 call: ` +
`${err.message}`));
}
}
_createAzureKey(requestBucketName, requestObjectKey,
bucketMatch) {
if (bucketMatch) {
return requestObjectKey;
}
return `${requestBucketName}/${requestObjectKey}`;
}
_getMetaHeaders(objectMD) {
const metaHeaders = {};
Object.keys(objectMD).forEach(mdKey => {
const isMetaHeader = mdKey.startsWith('x-amz-meta-');
if (isMetaHeader) {
metaHeaders[mdKey] = objectMD[mdKey];
}
});
return translateAzureMetaHeaders(metaHeaders);
}
// Before putting or deleting object on Azure, check if MPU exists with
// same key name. If it does, do not allow put or delete because Azure
// will delete all blocks with same key name
protectAzureBlocks(bucketName, objectKey, dataStoreName, log, cb) {
const mpuBucketName = `${constants.mpuBucketPrefix}${bucketName}`;
const splitter = constants.splitter;
const listingParams = {
prefix: `overview${splitter}${objectKey}`,
listingType: 'MPU',
splitter,
maxKeys: 1,
};
return metadata.listMultipartUploads(mpuBucketName, listingParams,
log, (err, mpuList) => {
if (err && !err.NoSuchBucket) {
log.error('Error listing MPUs for Azure delete',
{ error: err, dataStoreName });
return cb(errors.ServiceUnavailable);
}
if (mpuList && mpuList.Uploads && mpuList.Uploads.length > 0) {
const error = errors.MPUinProgress;
log.error('Error: cannot put/delete object to Azure with ' +
'same key name as ongoing MPU on Azure',
{ error, dataStoreName });
return cb(error);
}
// If listMultipartUploads returns a NoSuchBucket error or the
// mpu list is empty, there are no conflicting MPUs, so continue
return cb();
});
}
toObjectGetInfo(objectKey, bucketName) {
return {
key: this._createAzureKey(bucketName, objectKey, this._bucketMatch),
dataStoreName: this._dataStoreName,
};
}
put(stream, size, keyContext, reqUids, callback) {
const log = createLogger(reqUids);
// before blob is put, make sure there is no ongoing MPU with same key
this.protectAzureBlocks(keyContext.bucketName,
keyContext.objectKey, this._dataStoreName, log, err => {
// if error returned, there is ongoing MPU, so do not put
if (err) {
return callback(err.customizeDescription(
`Error putting object to Azure: ${err.message}`));
}
const azureKey = this._createAzureKey(keyContext.bucketName,
keyContext.objectKey, this._bucketMatch);
const options = {
metadata: translateAzureMetaHeaders(keyContext.metaHeaders,
keyContext.tagging),
contentSettings: {
contentType: keyContext.contentType || undefined,
cacheControl: keyContext.cacheControl || undefined,
contentDisposition: keyContext.contentDisposition ||
undefined,
contentEncoding: keyContext.contentEncoding || undefined,
},
};
if (size === 0) {
return this._errorWrapper('put', 'createBlockBlobFromText',
[this._azureContainerName, azureKey, '', options,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure PUT data ' +
'backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
}], log, callback);
}
return this._errorWrapper('put', 'createBlockBlobFromStream',
[this._azureContainerName, azureKey, stream, size, options,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure PUT data ' +
'backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback(null, azureKey);
}], log, callback);
});
}
head(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const { key, azureStreamingOptions } = objectGetInfo;
return this._errorWrapper('head', 'getBlobProperties',
[this._azureContainerName, key, azureStreamingOptions,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure HEAD data backend',
err, this._dataStoreName);
if (err.code === 'NotFound') {
const error = errors.ServiceUnavailable
.customizeDescription(
'Unexpected error from Azure: "NotFound". Data ' +
'on Azure may have been altered outside of ' +
'CloudServer.');
return callback(error);
}
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback();
}], log, callback);
}
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
// for backwards compatibility
const { key, response, azureStreamingOptions } = objectGetInfo;
let streamingOptions;
if (azureStreamingOptions) {
// option coming from api.get()
streamingOptions = azureStreamingOptions;
} else if (range) {
// option coming from multipleBackend.upload()
const rangeStart = range[0] ? range[0].toString() : undefined;
const rangeEnd = range[1] ? range[1].toString() : undefined;
streamingOptions = { rangeStart, rangeEnd };
}
this._errorWrapper('get', 'getBlobToStream',
[this._azureContainerName, key, response, streamingOptions,
err => {
if (err) {
logHelper(log, 'error', 'err from Azure GET data backend',
err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback(null, response);
}], log, callback);
}
delete(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
// for backwards compatibility
const key = typeof objectGetInfo === 'string' ? objectGetInfo :
objectGetInfo.key;
return this._errorWrapper('delete', 'deleteBlobIfExists',
[this._azureContainerName, key,
err => {
if (err) {
const log = createLogger(reqUids);
logHelper(log, 'error', 'error deleting object from ' +
'Azure datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
return callback();
}], log, callback);
}
healthcheck(location, callback, flightCheckOnStartUp) {
const azureResp = {};
const healthCheckAction = flightCheckOnStartUp ?
'createContainerIfNotExists' : 'doesContainerExist';
this._errorWrapper('checkAzureHealth', healthCheckAction,
[this._azureContainerName, err => {
/* eslint-disable no-param-reassign */
if (err) {
azureResp[location] = { error: err.message,
external: true };
return callback(null, azureResp);
}
azureResp[location] = {
message:
'Congrats! You can access the Azure storage account',
};
return callback(null, azureResp);
}], null, callback);
}
uploadPart(request, streamingV4Params, partStream, size, key, uploadId,
partNumber, bucket, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const params = { bucketName: this._azureContainerName,
partNumber, size, objectKey: azureKey, uploadId };
const stream = request || partStream;
if (request && request.headers && request.headers['content-md5']) {
params.contentMD5 = request.headers['content-md5'];
}
const dataRetrievalInfo = {
key: azureKey,
partNumber,
dataStoreName: this._dataStoreName,
dataStoreType: 'azure',
numberSubParts: azureMpuUtils.getSubPartInfo(size)
.expectedNumberSubParts,
};
if (size === 0) {
log.debug('0-byte part does not store data',
{ method: 'uploadPart' });
dataRetrievalInfo.dataStoreETag = azureMpuUtils.zeroByteETag;
dataRetrievalInfo.numberSubParts = 0;
return callback(null, dataRetrievalInfo);
}
if (size <= azureMpuUtils.maxSubPartSize) {
const errorWrapperFn = this._errorWrapper.bind(this);
return azureMpuUtils.putSinglePart(errorWrapperFn,
stream, params, this._dataStoreName, log, (err, dataStoreETag) => {
if (err) {
return callback(err);
}
dataRetrievalInfo.dataStoreETag = dataStoreETag;
return callback(null, dataRetrievalInfo);
});
}
const errorWrapperFn = this._errorWrapper.bind(this);
return azureMpuUtils.putSubParts(errorWrapperFn, stream,
params, this._dataStoreName, log, (err, dataStoreETag) => {
if (err) {
return callback(err);
}
dataRetrievalInfo.dataStoreETag = dataStoreETag;
return callback(null, dataRetrievalInfo);
});
}
completeMPU(jsonList, mdInfo, key, uploadId, bucket, metaHeaders,
contentSettings, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const commitList = {
UncommittedBlocks: jsonList.uncommittedBlocks || [],
};
let filteredPartsObj;
if (!jsonList.uncommittedBlocks) {
const { storedParts, mpuOverviewKey, splitter } = mdInfo;
filteredPartsObj = validateAndFilterMpuParts(storedParts, jsonList,
mpuOverviewKey, splitter, log);
filteredPartsObj.partList.forEach(part => {
// part.locations is always array of 1, which contains data info
const subPartIds =
azureMpuUtils.getSubPartIds(part.locations[0], uploadId);
commitList.UncommittedBlocks.push(...subPartIds);
});
}
const options = {
contentSettings,
metadata: metaHeaders ? translateAzureMetaHeaders(metaHeaders) :
undefined,
};
this._errorWrapper('completeMPU', 'commitBlocks',
[this._azureContainerName, azureKey, commitList, options,
err => {
if (err) {
logHelper(log, 'error', 'err completing MPU on Azure ' +
'datastore', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`Azure: ${err.message}`));
}
const completeObjData = {
key: azureKey,
filteredPartsObj,
};
return callback(null, completeObjData);
}], log, callback);
}
objectPutTagging(key, bucket, objectMD, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const azureMD = this._getMetaHeaders(objectMD);
azureMD.tags = JSON.stringify(objectMD.tags);
this._errorWrapper('objectPutTagging', 'setBlobMetadata',
[this._azureContainerName, azureKey, azureMD,
err => {
if (err) {
logHelper(log, 'error', 'err putting object tags to ' +
'Azure backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback();
}], log, callback);
}
objectDeleteTagging(key, bucket, objectMD, log, callback) {
const azureKey = this._createAzureKey(bucket, key, this._bucketMatch);
const azureMD = this._getMetaHeaders(objectMD);
this._errorWrapper('objectDeleteTagging', 'setBlobMetadata',
[this._azureContainerName, azureKey, azureMD,
err => {
if (err) {
logHelper(log, 'error', 'err putting object tags to ' +
'Azure backend', err, this._dataStoreName);
return callback(errors.ServiceUnavailable);
}
return callback();
}], log, callback);
}
copyObject(request, destLocationConstraintName, sourceKey,
sourceLocationConstraintName, storeMetadataParams, log, callback) {
const destContainerName = request.bucketName;
const destObjectKey = request.objectKey;
const destAzureKey = this._createAzureKey(destContainerName,
destObjectKey, this._bucketMatch);
const sourceContainerName =
config.locationConstraints[sourceLocationConstraintName]
.details.azureContainerName;
let options;
if (storeMetadataParams.metaHeaders) {
options = { metadata:
translateAzureMetaHeaders(storeMetadataParams.metaHeaders) };
}
this._errorWrapper('copyObject', 'startCopyBlob',
[`${this._azureStorageEndpoint}` +
`${sourceContainerName}/${sourceKey}`,
this._azureContainerName, destAzureKey, options,
(err, res) => {
if (err) {
if (err.code === 'CannotVerifyCopySource') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceContainerName} Azure Container`, err,
this._dataStoreName);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceContainerName} Azure Container`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'copyObject', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS: ${err.message}`)
);
}
if (res.copy.status === 'pending') {
logHelper(log, 'error', 'Azure copy status is pending',
err, this._dataStoreName);
const copyId = res.copy.id;
this._client.abortCopyBlob(this._azureContainerName,
destAzureKey, copyId, err => {
if (err) {
logHelper(log, 'error', 'error from data backend ' +
'on abortCopyBlob', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`AWS on abortCopyBlob: ${err.message}`)
);
}
return callback(errors.InvalidObjectState
.customizeDescription('Error: Azure copy status was ' +
'pending. It has been aborted successfully')
);
});
}
return callback(null, destAzureKey);
}], log, callback);
}
}
module.exports = AzureClient;

View File

@ -1,33 +0,0 @@
const { errors } = require('arsenal');
const MpuHelper = require('./mpuHelper');
const { createMpuKey, logger } = require('../GcpUtils');
const { logHelper } = require('../../utils');
/**
* abortMPU - remove all objects of a GCP Multipart Upload
* @param {object} params - abortMPU params
* @param {string} params.Bucket - bucket name
* @param {string} params.MPU - mpu bucket name
* @param {string} params.Key - object key
* @param {number} params.UploadId - MPU upload id
* @param {function} callback - callback function to call
* @return {undefined}
*/
function abortMPU(params, callback) {
if (!params || !params.Key || !params.UploadId ||
!params.Bucket || !params.MPU) {
const error = errors.InvalidRequest
.customizeDescription('Missing required parameter');
logHelper(logger, 'error', 'error in abortMultipartUpload', error);
return callback(error);
}
const mpuHelper = new MpuHelper(this);
const delParams = {
Bucket: params.Bucket,
MPU: params.MPU,
Prefix: createMpuKey(params.Key, params.UploadId),
};
return mpuHelper.removeParts(delParams, callback);
}
module.exports = abortMPU;

View File

@ -1,69 +0,0 @@
const async = require('async');
const { errors } = require('arsenal');
const MpuHelper = require('./mpuHelper');
const { createMpuKey, logger } = require('../GcpUtils');
const { logHelper } = require('../../utils');
/**
* completeMPU - merges a list of parts into a single object
* @param {object} params - completeMPU params
* @param {string} params.Bucket - bucket name
* @param {string} params.MPU - mpu bucket name
* @param {string} params.Key - object key
* @param {number} params.UploadId - MPU upload id
* @param {Object} params.MultipartUpload - MPU upload object
* @param {Object[]} param.MultipartUpload.Parts - a list of parts to merge
* @param {function} callback - callback function to call with MPU result
* @return {undefined}
*/
function completeMPU(params, callback) {
if (!params || !params.MultipartUpload ||
!params.MultipartUpload.Parts || !params.UploadId ||
!params.Bucket || !params.Key) {
const error = errors.InvalidRequest
.customizeDescription('Missing required parameter');
logHelper(logger, 'error', 'error in completeMultipartUpload', error);
return callback(error);
}
const partList = params.MultipartUpload.Parts;
// verify that the part list is in order
if (params.MultipartUpload.Parts.length === 0) {
const error = errors.InvalidRequest
.customizeDescription('You must specify at least one part');
logHelper(logger, 'error', 'error in completeMultipartUpload', error);
return callback(error);
}
for (let ind = 1; ind < partList.length; ++ind) {
if (partList[ind - 1].PartNumber >= partList[ind].PartNumber) {
logHelper(logger, 'error', 'error in completeMultipartUpload',
errors.InvalidPartOrder);
return callback(errors.InvalidPartOrder);
}
}
const mpuHelper = new MpuHelper(this); // this === GcpClient
return async.waterfall([
next => {
// first compose: in mpu bucket
// max 10,000 => 313 parts
// max component count per object 32
logger.trace('completeMultipartUpload: compose',
{ partCount: partList.length });
mpuHelper.splitMerge(params, partList, 'compose', next);
},
(numParts, next) => mpuHelper.composeFinal(numParts, params, next),
(result, next) => mpuHelper.generateMpuResult(result, partList, next),
(result, aggregateETag, next) =>
mpuHelper.copyToMain(result, aggregateETag, params, next),
(mpuResult, next) => {
const delParams = {
Bucket: params.Bucket,
MPU: params.MPU,
Prefix: createMpuKey(params.Key, params.UploadId),
};
return mpuHelper.removeParts(delParams,
err => next(err, mpuResult));
},
], callback);
}
module.exports = completeMPU;

View File

@ -1,51 +0,0 @@
const uuid = require('uuid/v4');
const { errors } = require('arsenal');
const { createMpuKey, logger, getPutTagsMetadata } = require('../GcpUtils');
const { logHelper } = require('../../utils');
/**
* createMPU - creates a MPU upload on GCP (sets a 0-byte object placeholder
* with for the final composed object)
* @param {object} params - createMPU param
* @param {string} params.Bucket - bucket name
* @param {string} params.Key - object key
* @param {string} params.Metadata - object Metadata
* @param {string} params.ContentType - Content-Type header
* @param {string} params.CacheControl - Cache-Control header
* @param {string} params.ContentDisposition - Content-Disposition header
* @param {string} params.ContentEncoding - Content-Encoding header
* @param {function} callback - callback function to call with the generated
* upload-id for MPU operations
* @return {undefined}
*/
function createMPU(params, callback) {
// As google cloud does not have a create MPU function,
// create an empty 'init' object that will temporarily store the
// object metadata and return an upload ID to mimic an AWS MPU
if (!params || !params.Bucket || !params.Key) {
const error = errors.InvalidRequest
.customizeDescription('Missing required parameter');
logHelper(logger, 'error', 'error in createMultipartUpload', error);
return callback(error);
}
const uploadId = uuid().replace(/-/g, '');
const mpuParams = {
Bucket: params.Bucket,
Key: createMpuKey(params.Key, uploadId, 'init'),
Metadata: params.Metadata,
ContentType: params.ContentType,
CacheControl: params.CacheControl,
ContentDisposition: params.ContentDisposition,
ContentEncoding: params.ContentEncoding,
};
mpuParams.Metadata = getPutTagsMetadata(mpuParams.Metadata, params.Tagging);
return this.putObject(mpuParams, err => {
if (err) {
logHelper(logger, 'error', 'error in createMPU - putObject', err);
return callback(err);
}
return callback(null, { UploadId: uploadId });
});
}
module.exports = createMPU;

View File

@ -1,24 +0,0 @@
const async = require('async');
const { stripTags } = require('../GcpUtils');
function deleteObjectTagging(params, callback) {
return async.waterfall([
next => this.headObject({
Bucket: params.Bucket,
Key: params.Key,
VersionId: params.VersionId,
}, next),
(resObj, next) => {
const completeMD = stripTags(resObj.Metadata);
this.copyObject({
Bucket: params.Bucket,
Key: params.Key,
CopySource: `${params.Bucket}/${params.Key}`,
Metadata: completeMD,
MetadataDirective: 'REPLACE',
}, next);
},
], callback);
}
module.exports = deleteObjectTagging;

View File

@ -1,19 +0,0 @@
const { retrieveTags } = require('../GcpUtils');
function getObjectTagging(params, callback) {
const headParams = {
Bucket: params.Bucket,
Key: params.Key,
VersionId: params.VersionId,
};
this.headObject(headParams, (err, res) => {
const TagSet = retrieveTags(res.Metadata);
const retObj = {
VersionId: res.VersionId,
TagSet,
};
return callback(null, retObj);
});
}
module.exports = getObjectTagging;

View File

@ -1,14 +0,0 @@
module.exports = {
// mpu functions
abortMultipartUpload: require('./abortMPU'),
completeMultipartUpload: require('./completeMPU'),
createMultipartUpload: require('./createMPU'),
listParts: require('./listParts'),
uploadPart: require('./uploadPart'),
uploadPartCopy: require('./uploadPartCopy'),
// object tagging
putObject: require('./putObject'),
putObjectTagging: require('./putTagging'),
getObjectTagging: require('./getTagging'),
deleteObjectTagging: require('./deleteTagging'),
};

View File

@ -1,42 +0,0 @@
const { errors } = require('arsenal');
const { createMpuKey, logger } = require('../GcpUtils');
const { logHelper } = require('../../utils');
/**
* listParts - list uploaded MPU parts
* @param {object} params - listParts param
* @param {string} params.Bucket - bucket name
* @param {string} params.Key - object key
* @param {string} params.UploadId - MPU upload id
* @param {function} callback - callback function to call with the list of parts
* @return {undefined}
*/
function listParts(params, callback) {
if (!params || !params.UploadId || !params.Bucket || !params.Key) {
const error = errors.InvalidRequest
.customizeDescription('Missing required parameter');
logHelper(logger, 'error', 'error in listParts', error);
return callback(error);
}
if (params.PartNumberMarker && params.PartNumberMarker < 0) {
return callback(errors.InvalidArgument
.customizeDescription('The request specified an invalid marker'));
}
const mpuParams = {
Bucket: params.Bucket,
Prefix: createMpuKey(params.Key, params.UploadId, 'parts'),
Marker: createMpuKey(params.Key, params.UploadId,
params.PartNumberMarker, 'parts'),
MaxKeys: params.MaxParts,
};
return this.listObjects(mpuParams, (err, res) => {
if (err) {
logHelper(logger, 'error',
'error in listParts - listObjects', err);
return callback(err);
}
return callback(null, res);
});
}
module.exports = listParts;

View File

@ -1,316 +0,0 @@
const async = require('async');
const Backoff = require('backo');
const { eachSlice, createMpuKey, createMpuList, logger } =
require('../GcpUtils');
const { logHelper } = require('../../utils');
const { createAggregateETag } =
require('../../../../api/apiUtils/object/processMpuParts');
const BACKOFF_PARAMS = { min: 1000, max: 300000, jitter: 0.1, factor: 1.5 };
class MpuHelper {
constructor(service, options = {}) {
this.service = service;
this.backoffParams = {
min: options.min || BACKOFF_PARAMS.min,
max: options.max || BACKOFF_PARAMS.max,
jitter: options.jitter || BACKOFF_PARAMS.jitter,
factor: options.factor || BACKOFF_PARAMS.factor,
};
}
_retry(fnName, params, callback) {
const backoff = new Backoff(this.backoffParams);
const handleFunc = (fnName, params, retry, callback) => {
const timeout = backoff.duration();
return setTimeout((params, cb) =>
this.service[fnName](params, cb), timeout, params,
(err, res) => {
if (err) {
if (err.statusCode === 429 || err.code === 429) {
if (fnName === 'composeObject') {
logger.trace('composeObject: slow down request',
{ retryCount: retry, timeout });
} else if (fnName === 'copyObject') {
logger.trace('copyObject: slow down request',
{ retryCount: retry, timeout });
}
return handleFunc(
fnName, params, retry + 1, callback);
}
logHelper(logger, 'error', `${fnName} failed`, err);
return callback(err);
}
backoff.reset();
return callback(null, res);
});
};
handleFunc(fnName, params, 0, callback);
}
/**
* retryCompose - exponential backoff retry implementation for the compose
* operation
* @param {object} params - compose object params
* @param {function} callback - callback function to call with the result
* of the compose operation
* @return {undefined}
*/
retryCompose(params, callback) {
this._retry('composeObject', params, callback);
}
/**
* retryCopy - exponential backoff retry implementation for the copy
* operation
* @param {object} params - copy object params
* @param {function} callback - callback function to call with the result
* of the copy operation
* @return {undefined}
*/
retryCopy(params, callback) {
this._retry('copyObject', params, callback);
}
/**
* splitMerge - breaks down the MPU list of parts to be compose on GCP;
* splits partList into chunks of 32 objects, the limit of each compose
* operation.
* @param {object} params - complete MPU params
* @param {string} params.Bucket - bucket name
* @param {string} params.MPU - mpu bucket name
* @param {string} params.Key - object key
* @param {string} params.UploadId - MPU upload id
* @param {object[]} partList - list of parts for complete multipart upload
* @param {string} level - the phase name of the MPU process
* @param {function} callback - the callback function to call
* @return {undefined}
*/
splitMerge(params, partList, level, callback) {
// create composition of slices from the partList array
return async.mapLimit(eachSlice.call(partList, 32),
this.service._maxConcurrent,
(infoParts, cb) => {
const mpuPartList = infoParts.Parts.map(item =>
({ PartName: item.PartName }));
const partNumber = infoParts.PartNumber;
const tmpKey =
createMpuKey(params.Key, params.UploadId, partNumber, level);
const mergedObject = { PartName: tmpKey };
if (mpuPartList.length < 2) {
logger.trace(
'splitMerge: parts are fewer than 2, copy instead');
// else just perform a copy
const copyParams = {
Bucket: params.MPU,
Key: tmpKey,
CopySource: `${params.MPU}/${mpuPartList[0].PartName}`,
};
return this.service.copyObject(copyParams, (err, res) => {
if (err) {
logHelper(logger, 'error',
'error in splitMerge - copyObject', err);
return cb(err);
}
mergedObject.VersionId = res.VersionId;
mergedObject.ETag = res.ETag;
return cb(null, mergedObject);
});
}
const composeParams = {
Bucket: params.MPU,
Key: tmpKey,
MultipartUpload: { Parts: mpuPartList },
};
return this.retryCompose(composeParams, (err, res) => {
if (err) {
return cb(err);
}
mergedObject.VersionId = res.VersionId;
mergedObject.ETag = res.ETag;
return cb(null, mergedObject);
});
}, (err, res) => {
if (err) {
return callback(err);
}
return callback(null, res.length);
});
}
/**
* removeParts - remove all objects created to perform a multipart upload
* @param {object} params - remove parts params
* @param {string} params.Bucket - bucket name
* @param {string} params.MPU - mpu bucket name
* @param {string} params.Key - object key
* @param {string} params.UploadId - MPU upload id
* @param {function} callback - callback function to call
* @return {undefined}
*/
removeParts(params, callback) {
const _getObjectVersions = callback => {
logger.trace('remove all parts from mpu bucket');
let partsList = [];
let isTruncated = true;
let nextMarker;
return async.whilst(() => isTruncated, next => {
const listParams = {
Bucket: params.MPU,
Prefix: params.Prefix,
Marker: nextMarker,
};
return this.service.listVersions(listParams, (err, res) => {
if (err) {
logHelper(logger, 'error', 'error in ' +
'removeParts - listVersions', err);
return next(err);
}
nextMarker = res.NextMarker;
isTruncated = res.IsTruncated;
partsList = partsList.concat(res.Versions);
return next();
});
}, err => callback(err, partsList));
};
const _deleteObjects = (partsList, callback) => {
logger.trace('successfully listed mpu parts', {
objectCount: partsList.length,
});
return async.eachLimit(partsList, 10, (obj, next) => {
const delParams = {
Bucket: params.MPU,
Key: obj.Key,
VersionId: obj.VersionId,
};
this.service.deleteObject(delParams, err => {
if (err) {
logHelper(logger, 'error',
'error deleting object', err);
return next(err);
}
return next();
});
}, err => callback(err));
};
return async.waterfall([
_getObjectVersions,
_deleteObjects,
], err => callback(err));
}
composeFinal(numParts, params, callback) {
// final compose:
// number of parts to compose <= 10
// perform final compose in mpu bucket
logger.trace('completeMultipartUpload: final compose');
const parts = createMpuList(params, 'compose', numParts);
const partList = parts.map(item => (
{ PartName: item.PartName }));
if (partList.length < 2) {
logger.trace(
'fewer than 2 parts, skip to copy phase');
return callback(null, partList[0].PartName);
}
const composeParams = {
Bucket: params.MPU,
Key: createMpuKey(params.Key, params.UploadId, 'final'),
MultipartUpload: { Parts: partList },
};
return this.retryCompose(composeParams, err => {
if (err) {
return callback(err);
}
return callback(null, null);
});
}
/*
* Create MPU Aggregate ETag
*/
generateMpuResult(res, partList, callback) {
const concatETag = partList.reduce((prev, curr) =>
prev + curr.ETag.substring(1, curr.ETag.length - 1), '');
const aggregateETag = createAggregateETag(concatETag, partList);
return callback(null, res, aggregateETag);
}
copyToMain(res, aggregateETag, params, callback) {
// move object from mpu bucket into the main bucket
// retrieve initial metadata then compose the object
const copySource = res ||
createMpuKey(params.Key, params.UploadId, 'final');
return async.waterfall([
next => {
// retrieve metadata from init object in mpu bucket
const headParams = {
Bucket: params.MPU,
Key: createMpuKey(params.Key, params.UploadId,
'init'),
};
logger.trace('retrieving object metadata');
return this.service.headObject(headParams, (err, res) => {
if (err) {
logHelper(logger, 'error',
'error in createMultipartUpload - headObject',
err);
return next(err);
}
return next(null, res);
});
},
(res, next) => {
const metadata = res.Metadata;
// copy the final object into the main bucket
const copyMetadata = Object.assign({}, metadata);
copyMetadata['scal-etag'] = aggregateETag;
const copyParams = {
Bucket: params.Bucket,
Key: params.Key,
Metadata: copyMetadata,
MetadataDirective: 'REPLACE',
CopySource: `${params.MPU}/${copySource}`,
ContentType: res.ContentType,
CacheControl: res.CacheControl,
ContentEncoding: res.ContentEncoding,
ContentDisposition: res.ContentDisposition,
ContentLanguage: res.ContentLanguage,
};
logger.trace('copyParams', { copyParams });
this.retryCopy(copyParams, (err, res) => {
if (err) {
logHelper(logger, 'error', 'error in ' +
'createMultipartUpload - final copyObject',
err);
return next(err);
}
const mpuResult = {
Bucket: params.Bucket,
Key: params.Key,
VersionId: res.VersionId,
ETag: `"${aggregateETag}"`,
};
return this.service.headObject({
Bucket: params.Bucket,
Key: params.Key,
VersionId: res.VersionId,
}, (err, res) => {
if (err) {
logHelper(logger, 'error', 'error in ' +
'createMultipartUpload - final head object',
err);
return next(err);
}
mpuResult.ContentLength = res.ContentLength;
return next(null, mpuResult);
});
});
},
], callback);
}
}
module.exports = MpuHelper;

View File

@ -1,11 +0,0 @@
const { getPutTagsMetadata } = require('../GcpUtils');
function putObject(params, callback) {
const putParams = Object.assign({}, params);
putParams.Metadata = getPutTagsMetadata(putParams.Metadata, params.Tagging);
delete putParams.Tagging;
// error handling will be by the actual putObject request
return this.putObjectReq(putParams, callback);
}
module.exports = putObject;

View File

@ -1,33 +0,0 @@
const async = require('async');
const { errors } = require('arsenal');
const { processTagSet } = require('../GcpUtils');
function putObjectTagging(params, callback) {
if (!params.Tagging || !params.Tagging.TagSet) {
return callback(errors.MissingParameter);
}
const tagRes = processTagSet(params.Tagging.TagSet);
if (tagRes instanceof Error) {
return callback(tagRes);
}
return async.waterfall([
next => this.headObject({
Bucket: params.Bucket,
Key: params.Key,
VersionId: params.VersionId,
}, next),
(resObj, next) => {
const completeMD = Object.assign({}, resObj.Metadata, tagRes);
this.copyObject({
Bucket: params.Bucket,
Key: params.Key,
CopySource: `${params.Bucket}/${params.Key}`,
Metadata: completeMD,
MetadataDirective: 'REPLACE',
}, next);
},
], callback);
}
module.exports = putObjectTagging;

View File

@ -1,43 +0,0 @@
const { errors } = require('arsenal');
const { getPartNumber, createMpuKey, logger } = require('../GcpUtils');
const { logHelper } = require('../../utils');
/**
* uploadPart - upload part
* @param {object} params - upload part params
* @param {string} params.Bucket - bucket name
* @param {string} params.Key - object key
* @param {function} callback - callback function to call
* @return {undefined}
*/
function uploadPart(params, callback) {
if (!params || !params.UploadId || !params.Bucket || !params.Key) {
const error = errors.InvalidRequest
.customizeDescription('Missing required parameter');
logHelper(logger, 'error', 'error in uploadPart', error);
return callback(error);
}
const partNumber = getPartNumber(params.PartNumber);
if (!partNumber) {
const error = errors.InvalidArgument
.customizeDescription('PartNumber is invalid');
logHelper(logger, 'debug', 'error in uploadPart', error);
return callback(error);
}
const mpuParams = {
Bucket: params.Bucket,
Key: createMpuKey(params.Key, params.UploadId, partNumber),
Body: params.Body,
ContentLength: params.ContentLength,
};
return this.putObjectReq(mpuParams, (err, res) => {
if (err) {
logHelper(logger, 'error',
'error in uploadPart - putObject', err);
return callback(err);
}
return callback(null, res);
});
}
module.exports = uploadPart;

View File

@ -1,37 +0,0 @@
const { errors } = require('arsenal');
const { getPartNumber, createMpuKey, logger } = require('../GcpUtils');
const { logHelper } = require('../../utils');
/**
* uploadPartCopy - upload part copy
* @param {object} params - upload part copy params
* @param {string} params.Bucket - bucket name
* @param {string} params.Key - object key
* @param {string} params.CopySource - source object to copy
* @param {function} callback - callback function to call
* @return {undefined}
*/
function uploadPartCopy(params, callback) {
if (!params || !params.UploadId || !params.Bucket || !params.Key ||
!params.CopySource) {
const error = errors.InvalidRequest
.customizeDescription('Missing required parameter');
logHelper(logger, 'error', 'error in uploadPartCopy', error);
return callback(error);
}
const partNumber = getPartNumber(params.PartNumber);
if (!partNumber) {
const error = errors.InvalidArgument
.customizeDescription('PartNumber is not a number');
logHelper(logger, 'debug', 'error in uploadPartCopy', error);
return callback(error);
}
const mpuParams = {
Bucket: params.Bucket,
Key: createMpuKey(params.Key, params.UploadId, partNumber),
CopySource: params.CopySource,
};
return this.copyObject(mpuParams, callback);
}
module.exports = uploadPartCopy;

View File

@ -1,402 +0,0 @@
const async = require('async');
const assert = require('assert');
const stream = require('stream');
const { errors } = require('arsenal');
const { minimumAllowedPartSize, gcpMaximumAllowedPartCount } =
require('../../../../constants');
const { createMpuList, logger } = require('./GcpUtils');
const { logHelper } = require('../utils');
function sliceFn(body, size) {
const array = [];
let partNumber = 1;
for (let ind = 0; ind < body.length; ind += size) {
array.push({
Body: body.slice(ind, ind + size),
PartNumber: partNumber++,
});
}
return array;
}
class GcpManagedUpload {
/**
* GcpMangedUpload - class to mimic the upload method in AWS-SDK
* To-Do: implement retry on failure like S3's upload
* @param {GcpService} service - client object
* @param {object} params - upload params
* @param {string} params.Bucket - bucket name
* @param {string} params.MPU - mpu bucket name
* @param {string} params.Key - object key
* @param {object} options - config setting for GcpManagedUpload object
* @param {number} options.partSize - set object chunk size
* @param {number} options.queueSize - set the number of concurrent upload
* @return {object} - return an GcpManagedUpload object
*/
constructor(service, params, options = {}) {
this.service = service;
this.params = params;
this.mainBucket =
this.params.Bucket || this.service.config.mainBucket;
this.mpuBucket =
this.params.MPU || this.service.config.mpuBucket;
this.partSize = minimumAllowedPartSize;
this.queueSize = options.queueSize || 4;
this.validateBody();
this.setPartSize();
// multipart information
this.parts = {};
this.uploadedParts = 0;
this.activeParts = 0;
this.partBuffers = [];
this.partQueue = [];
this.partBufferLength = 0;
this.totalChunkedBytes = 0;
this.partNumber = 0;
}
/**
* validateBody - validate that body contains data to upload. If body is not
* of type stream, it must then be of either string or buffer. If string,
* convert to a Buffer type and split into chunks if body is large enough
* @return {undefined}
*/
validateBody() {
this.body = this.params.Body;
assert(this.body, errors.MissingRequestBodyError.customizeDescription(
'Missing request body'));
this.totalBytes = this.params.ContentLength;
if (this.body instanceof stream) {
assert.strictEqual(typeof this.totalBytes, 'number',
errors.MissingContentLength.customizeDescription(
'If body is a stream, ContentLength must be provided'));
} else {
if (typeof this.body === 'string') {
this.body = Buffer.from(this.body);
}
this.totalBytes = this.body.byteLength;
assert(this.totalBytes, errors.InternalError.customizeDescription(
'Unable to perform upload'));
}
}
setPartSize() {
const newPartSize =
Math.ceil(this.totalBytes / gcpMaximumAllowedPartCount);
if (newPartSize > this.partSize) this.partSize = newPartSize;
this.totalParts = Math.ceil(this.totalBytes / this.partSize);
if (this.body instanceof Buffer && this.totalParts > 1) {
this.slicedParts = sliceFn(this.body, this.partSize);
}
}
/**
* cleanUp - function that is called if GcpManagedUpload fails at any point,
* perform clean up of used resources. Ends the request by calling an
* internal callback function
* @param {Error} err - Error object
* @return {undefined}
*/
cleanUp(err) {
// is only called when an error happens
if (this.failed || this.completed) {
return undefined;
}
this.failed = true;
if (this.uploadId) {
// if MPU was successfuly created
return this.abortMPU(mpuErr => {
if (mpuErr) {
logHelper(logger, 'error',
'GcpMangedUpload: abortMPU failed in cleanup', mpuErr);
}
return this.callback(err);
});
}
return this.callback(err);
}
/**
* abortMPU - function that is called to remove a multipart upload
* @param {function} callback - callback function to call to complete the
* upload
* @return {undefined}
*/
abortMPU(callback) {
const params = {
Bucket: this.mainBucket,
MPU: this.mpuBucket,
UploadId: this.uploadId,
Key: this.params.Key,
};
this.service.abortMultipartUpload(params, callback);
}
/**
* completeUpload - function that is called to to complete a multipart
* upload
* @param {function} callback - callback function to call to complete the
* upload
* @return {undefined}
*/
completeUpload() {
if (this.failed || this.completed) {
return undefined;
}
const params = {
Bucket: this.mainBucket,
MPU: this.mpuBucket,
Key: this.params.Key,
UploadId: this.uploadId,
MultipartUpload: {},
};
params.MultipartUpload.Parts =
createMpuList(params, 'parts', this.uploadedParts)
.map(item =>
Object.assign(item, { ETag: this.parts[item.PartNumber] }));
return this.service.completeMultipartUpload(params,
(err, res) => {
if (err) {
return this.cleanUp(err);
}
this.completed = true;
return this.callback(null, res);
});
}
/**
* send - function that is called to execute the method request
* @param {function} callback - callback function to be called and stored
* at the completion of the method
* @return {undefined}
*/
send(callback) {
if (this.called || this.callback) {
return undefined;
}
this.failed = false;
this.called = true;
this.callback = callback;
if (this.totalBytes <= this.partSize) {
return this.uploadSingle();
}
if (this.slicedParts) {
return this.uploadBufferSlices();
}
if (this.body instanceof stream) {
// stream type
this.body.on('error', err => this.cleanUp(err))
.on('readable', () => this.chunkStream())
.on('end', () => {
this.isDoneChunking = true;
this.chunkStream();
if (this.isDoneChunking && this.uploadedParts >= 1 &&
this.uploadedParts === this.totalParts) {
this.completeUpload();
}
});
}
return undefined;
}
/**
* uploadSingle - perform a regular put object upload if the object is
* small enough
* @return {undefined}
*/
uploadSingle() {
if (this.failed || this.completed) {
return undefined;
}
// use putObject to upload the single part object
const params = Object.assign({}, this.params);
params.Bucket = this.mainBucket;
delete params.MPU;
return this.service.putObject(params, (err, res) => {
if (err) {
return this.cleanUp(err);
}
// return results from a putObject request
this.completed = true;
return this.callback(null, res);
});
}
/**
* uploadBufferSlices - perform a multipart upload for body of type string
* or Buffer.
* @return {undefined}
*/
uploadBufferSlices() {
if (this.failed || this.completed) {
return undefined;
}
if (this.slicedParts.length <= 1 && this.totalParts) {
// there is only one part
return this.uploadSingle();
}
// multiple slices
return async.series([
// createMultipartUpload
next => {
const params = this.params;
params.Bucket = this.mpuBucket;
this.service.createMultipartUpload(params, (err, res) => {
if (!err) {
this.uploadId = res.UploadId;
}
return next(err);
});
},
next => async.eachLimit(this.slicedParts, this.queueSize,
(uploadPart, done) => {
const params = {
Bucket: this.mpuBucket,
Key: this.params.Key,
UploadId: this.uploadId,
Body: uploadPart.Body,
PartNumber: uploadPart.PartNumber,
};
this.service.uploadPart(params, (err, res) => {
if (!err) {
this.parts[uploadPart.PartNumber] = res.ETag;
this.uploadedParts++;
}
return done(err);
});
}, next),
], err => {
if (err) {
return this.cleanUp(new Error(
'GcpManagedUpload: unable to complete upload'));
}
return this.completeUpload();
});
}
/**
* chunkStream - read stream up until the max chunk size then call an
* uploadPart method on that chunk. If more than chunk size has be read,
* move the extra bytes into a queue for the next read.
* @return {undefined}
*/
chunkStream() {
const buf = this.body.read(this.partSize - this.partBufferLength) ||
this.body.read();
if (buf) {
this.partBuffers.push(buf);
this.partBufferLength += buf.length;
this.totalChunkedBytes += buf.length;
}
let pbuf;
if (this.partBufferLength >= this.partSize) {
pbuf = Buffer.concat(this.partBuffers);
this.partBuffers = [];
this.partBufferLength = 0;
if (pbuf.length > this.partSize) {
const rest = pbuf.slice(this.partSize);
this.partBuffers.push(rest);
this.partBufferLength += rest.length;
pbuf = pbuf.slice(0, this.partSize);
}
this.processChunk(pbuf);
}
// when chunking the last part
if (this.isDoneChunking && !this.completeChunking) {
this.completeChunking = true;
pbuf = Buffer.concat(this.partBuffers);
this.partBuffers = [];
this.partBufferLength = 0;
if (pbuf.length > 0) {
this.processChunk(pbuf);
} else {
if (this.uploadedParts === 0) {
// this is a 0-byte object
this.uploadSingle();
}
}
}
this.body.read(0);
}
/**
* processChunk - create a multipart request if one does not exist;
* otherwise, call uploadChunk to upload a chunk
* @param {Buffer} chunk - bytes to be uploaded
* @return {undefined}
*/
processChunk(chunk) {
const partNumber = ++this.partNumber;
if (!this.uploadId) {
// if multipart upload does not exist
if (!this.multipartReq) {
const params = this.params;
params.Bucket = this.mpuBucket;
this.multipartReq =
this.service.createMultipartUpload(params, (err, res) => {
if (err) {
return this.cleanUp(err);
}
this.uploadId = res.UploadId;
this.uploadChunk(chunk, partNumber);
if (this.partQueue.length > 0) {
this.partQueue.forEach(
part => this.uploadChunk(...part));
}
return undefined;
});
} else {
this.partQueue.push([chunk, partNumber]);
}
} else {
// queues chunks for upload
this.uploadChunk(chunk, partNumber);
this.activeParts++;
if (this.activeParts < this.queueSize) {
this.chunkStream();
}
}
}
/**
* uploadChunk - perform the partUpload
* @param {Buffer} chunk - bytes to be uploaded
* @param {number} partNumber - upload object part number
* @return {undefined}
*/
uploadChunk(chunk, partNumber) {
if (this.failed || this.completed) {
return undefined;
}
const params = {
Bucket: this.mpuBucket,
Key: this.params.Key,
UploadId: this.uploadId,
PartNumber: partNumber,
Body: chunk,
ContentLength: chunk.length,
};
return this.service.uploadPart(params, (err, res) => {
if (err) {
return this.cleanUp(err);
}
this.parts[partNumber] = res.ETag;
this.uploadedParts++;
this.activeParts--;
if (this.totalParts === this.uploadedParts &&
this.isDoneChunking) {
return this.completeUpload();
}
return this.chunkStream();
});
}
}
module.exports = GcpManagedUpload;

View File

@ -1,141 +0,0 @@
const AWS = require('aws-sdk');
const { errors } = require('arsenal');
const Service = AWS.Service;
const GcpApis = require('./GcpApis');
const GcpServiceSetup = require('./GcpServiceSetup');
const GcpManagedUpload = require('./GcpManagedUpload');
AWS.apiLoader.services.gcp = {};
const GCP = Service.defineService('gcp', ['2017-11-01']);
Object.assign(GCP.prototype, GcpServiceSetup, {
_maxConcurrent: 5,
// Implemented APIs
// Bucket API
getBucket(params, callback) {
return this.listObjects(params, callback);
},
// Object APIs
upload(params, callback) {
try {
const uploader = new GcpManagedUpload(this, params);
return uploader.send(callback);
} catch (err) {
return callback(err);
}
},
putObjectCopy(params, callback) {
return this.copyObject(params, callback);
},
// TO-DO: Implement the following APIs
// Service API
listBuckets(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: listBuckets not implemented'));
},
// Bucket APIs
getBucketLocation(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketLocation not implemented'));
},
deleteBucket(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteBucket not implemented'));
},
listObjectVersions(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: listObjectVersions not implemented'));
},
createBucket(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: createBucket not implemented'));
},
putBucket(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucket not implemented'));
},
getBucketAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketAcl not implemented'));
},
putBucketAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucketAcl not implemented'));
},
putBucketWebsite(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucketWebsite not implemented'));
},
getBucketWebsite(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketWebsite not implemented'));
},
deleteBucketWebsite(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteBucketWebsite not implemented'));
},
putBucketCors(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putBucketCors not implemented'));
},
getBucketCors(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getBucketCors not implemented'));
},
deleteBucketCors(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteBucketCors not implemented'));
},
// Object APIs
putObjectTagging(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putObjectTagging not implemented'));
},
deleteObjectTagging(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: deleteObjectTagging not implemented'));
},
putObjectAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: putObjectAcl not implemented'));
},
getObjectAcl(params, callback) {
return callback(errors.NotImplemented
.customizeDescription('GCP: getObjectAcl not implemented'));
},
});
Object.assign(GCP.prototype, GcpApis);
Object.defineProperty(AWS.apiLoader.services.gcp, '2017-11-01', {
get: function get() {
const model = require('./gcp-2017-11-01.api.json');
return model;
},
enumerable: true,
configurable: true,
});
module.exports = GCP;

View File

@ -1,246 +0,0 @@
/**
* This file contains the private methods for the GCP service to form/sign
* Google Cloud Storage requests.
*
* It uses a modified version of the S3 service private methods to form/sign
* requests compatible with the Google Cloud Storage XML api.
*/
const AWS = require('aws-sdk');
const GcpSigner = require('./GcpSigner');
module.exports = {
/**
* GCP compatible request signer
* @return {AWS.Signer} AWS Signer
*/
getSignerClass() {
return GcpSigner;
},
/**
* set service region
* @return {undefined}
*/
validateService() {
if (!this.config.region) {
this.config.region = 'us-east-1';
}
},
/**
* setup listeners for building requests
* @param {AWS.Request} request - AWS request object
* @return {undefined}
*/
setupRequestListeners(request) {
request.addListener('validate', this.validateBucketEndpoint);
request.addListener('build', this.addContentType);
request.addListener('build', this.populateURI);
request.addListener('build', this.computeContentMd5);
request.addListener('extractError', this.extractRequestIds);
},
/**
* validate that when bucket endpoitn flag is set, root level apis are
* inaccessible
* @param {AWS.Request} req - AWS request object
* @returns {undefined}
* @api private
*/
validateBucketEndpoint(req) {
if (!req.params.Bucket && req.service.config.s3BucketEndpoint) {
const msg =
'Cannot send requests to root API with `s3BucketEndpoint` set.';
throw AWS.util.error(new Error(),
{ code: 'ConfigError', message: msg });
}
},
/**
* S3 prefers dns-compatible bucket names to be moved from the uri path
* to the hostname as a sub-domain. This is not possible, even for
* dns-compat buckets when using SSL and the bucket name contains a dot.
* The ssl wildcard certificate is only 1-level deep.
* @param {AWS.Request} req - AWS request object
* @returns {undefined}
*
* @api private
*/
populateURI(req) {
const httpRequest = req.httpRequest;
const b = req.params.Bucket;
const service = req.service;
const endpoint = httpRequest.endpoint;
if (b) {
if (!service.pathStyleBucketName(b)) {
if (!service.config.s3BucketEndpoint) {
endpoint.hostname = `${b}.${endpoint.hostname}`;
}
const port = endpoint.port;
if (port !== 80 && port !== 443) {
endpoint.host = `${endpoint.hostname}:${endpoint.port}`;
} else {
endpoint.host = endpoint.hostname;
}
// needed for signing the request
httpRequest.virtualHostedBucket = b;
service.removeVirtualHostedBucketFromPath(req);
}
}
},
/**
* Takes the bucket name out of the path if bucket is virtual-hosted
* @param {AWS.Request} req - AWS request object
* @returns {undefined}
*
* @api private
*/
removeVirtualHostedBucketFromPath(req) {
const httpRequest = req.httpRequest;
const bucket = httpRequest.virtualHostedBucket;
if (bucket && httpRequest.path) {
httpRequest.path =
httpRequest.path.replace(new RegExp(`/${bucket}`), '');
if (httpRequest.path[0] !== '/') {
httpRequest.path = `/${httpRequest.path}`;
}
}
},
/**
* Adds a default content type if none is supplied.
* @param {AWS.Request} req - AWS request object
* @returns {undefined}
*
* @api private
*/
addContentType(req) {
const httpRequest = req.httpRequest;
if (httpRequest.method === 'GET' || httpRequest.method === 'HEAD') {
// Content-Type is not set in GET/HEAD requests
delete httpRequest.headers['Content-Type'];
return;
}
// always have a Content-Type
if (!httpRequest.headers['Content-Type']) {
httpRequest.headers['Content-Type'] = 'application/octet-stream';
}
const contentType = httpRequest.headers['Content-Type'];
if (AWS.util.isBrowser()) {
if (typeof httpRequest.body === 'string' &&
!contentType.match(/;\s*charset=/)) {
const charset = '; charset=UTF-8';
httpRequest.headers['Content-Type'] += charset;
} else {
const replaceFn = (_, prefix, charsetName) =>
prefix + charsetName.toUpperCase();
httpRequest.headers['Content-Type'] =
contentType.replace(/(;\s*charset=)(.+)$/, replaceFn);
}
}
},
computableChecksumOperations: {
putBucketCors: true,
putBucketLifecycle: true,
putBucketLifecycleConfiguration: true,
putBucketTagging: true,
deleteObjects: true,
putBucketReplication: true,
},
/**
* Checks whether checksums should be computed for the request.
* If the request requires checksums to be computed, this will always
* return true, otherwise it depends on whether
* {AWS.Config.computeChecksums} is set.
* @param {AWS.Request} req - the request to check against
* @return {Boolean} whether to compute checksums for a request.
*
* @api private
*/
willComputeChecksums(req) {
if (this.computableChecksumOperations[req.operation]) return true;
if (!this.config.computeChecksums) return false;
// TODO: compute checksums for Stream objects
if (!AWS.util.Buffer.isBuffer(req.httpRequest.body) &&
typeof req.httpRequest.body !== 'string') {
return false;
}
return false;
},
/**
* A listener that computes the Content-MD5 and sets it in the header.
* @param {AWS.Request} req - AWS request object
* @returns {undefined}
*
* @see AWS.S3.willComputeChecksums
* @api private
*/
computeContentMd5(req) {
if (req.service.willComputeChecksums(req)) {
const md5 = AWS.util.crypto.md5(req.httpRequest.body, 'base64');
// eslint-disable-next-line no-param-reassign
req.httpRequest.headers['Content-MD5'] = md5;
}
},
/**
* Returns true if the bucket name should be left in the URI path for
* a request to S3. This function takes into account the current
* endpoint protocol (e.g. http or https).
* @param {string} bucketName - bucket name
* @returns {Boolean} whether request should use path style
*
* @api private
*/
pathStyleBucketName(bucketName) {
// user can force path style requests via the configuration
if (this.config.s3ForcePathStyle) return true;
if (this.config.s3BucketEndpoint) return false;
if (this.dnsCompatibleBucketName(bucketName)) {
return this.config.sslEnabled && bucketName.match(/\./);
}
return true; // not dns compatible names must always use path style
},
/**
* Returns true if the bucket name is DNS compatible. Buckets created
* outside of the classic region MUST be DNS compatible.
* @param {string} bucketName - bucket name
* @returns {Boolean} whether bucket name is dns compatible
*
* @api private
*/
dnsCompatibleBucketName(bucketName) {
const b = bucketName;
const domain = new RegExp(/^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$/);
const ipAddress = new RegExp(/(\d+\.){3}\d+/);
const dots = new RegExp(/\.\./);
return b.match(domain) && !b.match(ipAddress) && !b.match(dots);
},
/**
* Extracts GCP specific request ids from the http response.
* @param {object} resp - response object
* @returns {undefined}
* @api private
*/
extractRequestIds(resp) {
const requestId = resp.httpResponse.headers ?
resp.httpResponse.headers['x-guploader-uploadid'] : null;
if (resp.error) {
// eslint-disable-next-line no-param-reassign
resp.error.requestId = resp.requestId || requestId;
}
},
};

View File

@ -1,51 +0,0 @@
const url = require('url');
const qs = require('querystring');
const AWS = require('aws-sdk');
const werelogs = require('werelogs');
const { constructStringToSignV2 } = require('arsenal').auth.client;
const logger = new werelogs.Logger('GcpSigner');
function genQueryObject(uri) {
const queryString = url.parse(uri).query;
return qs.parse(queryString);
}
const GcpSigner = AWS.util.inherit(AWS.Signers.RequestSigner, {
constructor: function GcpSigner(request) {
AWS.Signers.RequestSigner.call(this, request);
},
addAuthorization: function addAuthorization(credentials, date) {
if (!this.request.headers['presigned-expires']) {
this.request.headers['x-goog-date'] = AWS.util.date.rfc822(date);
}
const signature =
this.sign(credentials.secretAccessKey, this.stringToSign());
const auth = `GOOG1 ${credentials.accessKeyId}: ${signature}`;
this.request.headers.Authorization = auth;
},
stringToSign: function stringToSign() {
const requestObject = {
url: this.request.path,
method: this.request.method,
host: this.request.endpoint.host,
headers: this.request.headers,
bucketName: this.request.virtualHostedBucket,
query: genQueryObject(this.request.path) || {},
};
requestObject.gotBucketNameFromHost =
requestObject.host.indexOf(this.request.virtualHostedBucket) >= 0;
const data = Object.assign({}, this.request.headers);
return constructStringToSignV2(requestObject, data, logger, 'GCP');
},
sign: function sign(secret, string) {
return AWS.util.crypto.hmac(secret, string, 'base64', 'sha1');
},
});
module.exports = GcpSigner;

View File

@ -1,169 +0,0 @@
const werelogs = require('werelogs');
const { errors, s3middleware } = require('arsenal');
const _config = require('../../../Config').config;
const { gcpTaggingPrefix } = require('../../../../constants');
werelogs.configure({
level: _config.log.logLevel,
dump: _config.log.dumpLevel,
});
const logger = new werelogs.Logger('gcpUtil');
function eachSlice(size) {
this.array = [];
let partNumber = 1;
for (let ind = 0; ind < this.length; ind += size) {
this.array.push({
Parts: this.slice(ind, ind + size),
PartNumber: partNumber++,
});
}
return this.array;
}
function getSourceInfo(CopySource) {
const source =
CopySource.startsWith('/') ? CopySource.slice(1) : CopySource;
const sourceArray = source.split(/\/(.+)/);
const sourceBucket = sourceArray[0];
const sourceObject = sourceArray[1];
return { sourceBucket, sourceObject };
}
function getPaddedPartNumber(number) {
return `000000${number}`.substr(-5);
}
function getPartNumber(number) {
if (isNaN(number)) {
return undefined;
}
if (typeof number === 'string') {
return parseInt(number, 10);
}
return number;
}
function createMpuKey(key, uploadId, partNumberArg, fileNameArg) {
let partNumber = partNumberArg;
let fileName = fileNameArg;
if (typeof partNumber === 'string' && fileName === undefined) {
fileName = partNumber;
partNumber = null;
}
const paddedNumber = getPaddedPartNumber(partNumber);
if (fileName && typeof fileName === 'string') {
// if partNumber is given, return a "full file path"
// else return a "directory path"
return partNumber ? `${key}-${uploadId}/${fileName}/${paddedNumber}` :
`${key}-${uploadId}/${fileName}`;
}
if (partNumber && typeof partNumber === 'number') {
// filename wasn't passed as an argument. Create default
return `${key}-${uploadId}/parts/${paddedNumber}`;
}
// returns a "directory path"
return `${key}-${uploadId}/`;
}
function createMpuList(params, level, size) {
// populate and return a parts list for compose
const retList = [];
for (let i = 1; i <= size; ++i) {
retList.push({
PartName: createMpuKey(params.Key, params.UploadId, i, level),
PartNumber: i,
});
}
return retList;
}
function processTagSet(tagSet = []) {
if (tagSet.length > 10) {
return errors.BadRequest
.customizeDescription('Object tags cannot be greater than 10');
}
let error = undefined;
const tagAsMeta = {};
const taggingDict = {};
tagSet.every(tag => {
const { Key: key, Value: value } = tag;
if (key.length > 128) {
error = errors.InvalidTag
.customizeDescription(
`The TagKey provided is too long, ${key.length}`);
return false;
}
if (value.length > 256) {
error = errors.InvalidTag
.customizeDescription(
`The TagValue provided is too long, ${value.length}`);
return false;
}
if (taggingDict[key]) {
error = errors.InvalidTag
.customizeDescription(
'Cannot provide multiple Tags with the same key');
return false;
}
tagAsMeta[`${gcpTaggingPrefix}${key}`] = value;
taggingDict[key] = true;
return true;
});
if (error) {
return error;
}
return tagAsMeta;
}
function stripTags(metadata = {}) {
const retMD = Object.assign({}, metadata);
Object.keys(retMD).forEach(key => {
if (key.startsWith(gcpTaggingPrefix)) {
delete retMD[key];
}
});
return retMD;
}
function retrieveTags(metadata = {}) {
const retTagSet = [];
Object.keys(metadata).forEach(key => {
if (key.startsWith(gcpTaggingPrefix)) {
retTagSet.push({
Key: key.slice(gcpTaggingPrefix.length),
Value: metadata[key],
});
}
});
return retTagSet;
}
function getPutTagsMetadata(metadata, tagging = '') {
let retMetadata = metadata || {};
retMetadata = stripTags(retMetadata);
const tagObj = s3middleware.tagging.parseTagFromQuery(tagging);
Object.keys(tagObj).forEach(header => {
const prefixed = `${gcpTaggingPrefix}${header}`.toLowerCase();
retMetadata[prefixed] = tagObj[header];
});
return retMetadata;
}
module.exports = {
// functions
eachSlice,
createMpuKey,
createMpuList,
getSourceInfo,
processTagSet,
stripTags,
retrieveTags,
getPutTagsMetadata,
getPartNumber,
// util objects
logger,
};

View File

@ -1,932 +0,0 @@
{
"version": "1.0",
"metadata": {
"apiVersion": "2017-11-01",
"checksumFormat": "md5",
"endpointPrefix": "storage",
"globalEndpoint": "storage.googleapi.com",
"protocol": "rest-xml",
"serviceAbbreviation": "GCP",
"serviceFullName": "Google Cloud Storage",
"signatureVersion": "s3",
"timestampFormat": "rfc822",
"uid": "gcp-2017-11-01"
},
"operations": {
"HeadBucket": {
"http": {
"method": "HEAD",
"requestUri": "/{Bucket}"
},
"input": {
"type": "structure",
"required": [
"Bucket"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"MetaVersionId": {
"location": "header",
"locationName": "x-goog-metageneration"
}
}
}
},
"listObjects": {
"http": {
"method": "GET",
"requestUri": "/{Bucket}"
},
"input": {
"type": "structure",
"required": [
"Bucket"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"Delimiter": {
"location": "querystring",
"locationName": "delimiter"
},
"Marker": {
"location": "querystring",
"locationName": "marker"
},
"MaxKeys": {
"location": "querystring",
"locationName": "max-keys",
"type": "integer"
},
"Prefix": {
"location": "querystring",
"locationName": "prefix"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"IsTruncated": {
"type": "boolean"
},
"Marker": {},
"NextMarker": {},
"Contents": {
"shape": "ContentsShape"
},
"Name": {},
"Prefix": {},
"Delimiter": {},
"MaxKeys": {
"type": "integer"
},
"CommonPrefixes": {
"shape": "CommonPrefixShape"
}
}
}
},
"listVersions": {
"http": {
"method": "GET",
"requestUri": "/{Bucket}?versions"
},
"input": {
"type": "structure",
"required": [
"Bucket"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"Delimiter": {
"location": "querystring",
"locationName": "delimiter"
},
"Marker": {
"location": "querystring",
"locationName": "marker"
},
"MaxKeys": {
"location": "querystring",
"locationName": "max-keys",
"type": "integer"
},
"Prefix": {
"location": "querystring",
"locationName": "prefix"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"IsTruncated": {
"type": "boolean"
},
"Marker": {},
"NextMarker": {},
"Versions": {
"locationName": "Version",
"shape": "ContentsShape"
},
"Name": {},
"Prefix": {},
"Delimiter": {},
"MaxKeys": {
"type": "integer"
},
"CommonPrefixes": {
"shape": "CommonPrefixShape"
}
}
}
},
"PutBucketVersioning": {
"http": {
"method": "PUT",
"requestUri": "/{Bucket}?versioning"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"VersioningConfiguration"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"ContentMD5": {
"location": "header",
"locationName": "Content-MD5"
},
"VersioningConfiguration": {
"locationName": "VersioningConfiguration",
"type": "structure",
"members": {
"Status": {}
}
}
},
"payload": "VersioningConfiguration"
}
},
"GetBucketVersioning": {
"http": {
"method": "GET",
"requestUri": "/{Bucket}?versioning"
},
"input": {
"type": "structure",
"required": [
"Bucket"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
}
}
},
"output": {
"type": "structure",
"members": {
"Status": {}
}
}
},
"HeadObject": {
"http": {
"method": "HEAD",
"requestUri": "/{Bucket}/{Key+}"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"Key"
],
"members": {
"Date": {
"location": "header",
"locationName": "Date",
"type": "timestamp"
},
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"IfMatch": {
"location": "header",
"locationName": "If-Match"
},
"IfModifiedSince": {
"location": "header",
"locationName": "If-Modified-Since",
"type": "timestamp"
},
"IfNoneMatch": {
"location": "header",
"locationName": "If-None-Match"
},
"IfUnmodifiedSince": {
"location": "header",
"locationName": "If-Unmodified-Since",
"type": "timestamp"
},
"Range": {
"location": "header",
"locationName": "Range"
},
"Key": {
"location": "uri",
"locationName": "Key"
},
"Range": {
"location": "header",
"locationName": "Range"
},
"VersionId": {
"location": "querystring",
"locationName": "generation"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"Date": {
"location": "header",
"locationName": "Date",
"type": "timestamp"
},
"AcceptRanges": {
"location": "header",
"locationName": "accept-ranges"
},
"Expiration": {
"location": "header",
"locationName": "x-goog-expiration"
},
"LastModified": {
"location": "header",
"locationName": "Last-Modified",
"type": "timestamp"
},
"ContentLength": {
"location": "header",
"locationName": "Content-Length",
"type": "long"
},
"ContentHash": {
"location": "header",
"locationName": "x-goog-hash"
},
"ETag": {
"location": "header",
"locationName": "ETag"
},
"VersionId": {
"location": "header",
"locationName": "x-goog-generation"
},
"MetaVersionId": {
"location": "header",
"locationName": "x-goog-metageneration"
},
"CacheControl": {
"location": "header",
"locationName": "Cache-Control"
},
"ContentDisposition": {
"location": "header",
"locationName": "Content-Disposition"
},
"ContentEncoding": {
"location": "header",
"locationName": "Content-Encoding"
},
"ContentLanguage": {
"location": "header",
"locationName": "Content-Language"
},
"ContentType": {
"location": "header",
"locationName": "Content-Type"
},
"Expires": {
"location": "header",
"locationName": "Expires",
"type": "timestamp"
},
"Metadata": {
"shape": "MetadataShape",
"location": "headers",
"locationName": "x-goog-meta-"
},
"StorageClass": {
"location": "headers",
"locationName": "x-goog-storage-class"
}
}
}
},
"PutObjectReq": {
"http": {
"method": "PUT",
"requestUri": "/{Bucket}/{Key+}"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"Key"
],
"members": {
"Date": {
"location": "header",
"locationName": "Date",
"type": "timestamp"
},
"ACL": {
"location": "header",
"locationName": "x-goog-acl"
},
"Body": {
"streaming": true,
"type": "blob"
},
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"CacheControl": {
"location": "header",
"locationName": "Cache-Control"
},
"ContentDisposition": {
"location": "header",
"locationName": "Content-Disposition"
},
"ContentEncoding": {
"location": "header",
"locationName": "Content-Encoding"
},
"ContentLanguage": {
"location": "header",
"locationName": "Content-Language"
},
"ContentLength": {
"location": "header",
"locationName": "Content-Length",
"type": "long"
},
"ContentMD5": {
"location": "header",
"locationName": "Content-MD5"
},
"ContentType": {
"location": "header",
"locationName": "Content-Type"
},
"Expires": {
"location": "header",
"locationName": "Expires",
"type": "timestamp"
},
"Key": {
"location": "uri",
"locationName": "Key"
},
"Metadata": {
"shape": "MetadataShape",
"location": "headers",
"locationName": "x-goog-meta-"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
},
"payload": "Body"
},
"output": {
"type": "structure",
"members": {
"Expiration": {
"location": "header",
"locationName": "x-goog-expiration"
},
"ETag": {
"location": "header",
"locationName": "ETag"
},
"ContentHash": {
"location": "header",
"locationName": "x-goog-hash"
},
"VersionId": {
"location": "header",
"locationName": "x-goog-generation"
},
"MetaVersionId": {
"location": "header",
"locationName": "x-goog-metageneration"
}
}
}
},
"GetObject": {
"http": {
"method": "GET",
"requestUri": "/{Bucket}/{Key+}"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"Key"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"IfMatch": {
"location": "header",
"locationName": "If-Match"
},
"IfModifiedSince": {
"location": "header",
"locationName": "If-Modified-Since",
"type": "timestamp"
},
"IfNoneMatch": {
"location": "header",
"locationName": "If-None-Match"
},
"IfUnmodifiedSince": {
"location": "header",
"locationName": "If-Unmodified-Since",
"type": "timestamp"
},
"Key": {
"location": "uri",
"locationName": "Key"
},
"Range": {
"location": "header",
"locationName": "Range"
},
"ResponseCacheControl": {
"location": "querystring",
"locationName": "response-cache-control"
},
"ResponseContentDisposition": {
"location": "querystring",
"locationName": "response-content-disposition"
},
"ResponseContentEncoding": {
"location": "querystring",
"locationName": "response-content-encoding"
},
"ResponseContentLanguage": {
"location": "querystring",
"locationName": "response-content-language"
},
"ResponseContentType": {
"location": "querystring",
"locationName": "response-content-type"
},
"ResponseExpires": {
"location": "querystring",
"locationName": "response-expires",
"type": "timestamp"
},
"VersionId": {
"location": "querystring",
"locationName": "generation"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"Body": {
"streaming": true,
"type": "blob"
},
"AcceptRanges": {
"location": "header",
"locationName": "accept-ranges"
},
"Expiration": {
"location": "header",
"locationName": "x-goog-expiration"
},
"LastModified": {
"location": "header",
"locationName": "Last-Modified",
"type": "timestamp"
},
"ContentLength": {
"location": "header",
"locationName": "Content-Length",
"type": "long"
},
"ETag": {
"location": "header",
"locationName": "ETag"
},
"VersionId": {
"location": "header",
"locationName": "x-goog-generation"
},
"MetaVersionId": {
"location": "header",
"locationName": "x-goog-metageneration"
},
"CacheControl": {
"location": "header",
"locationName": "Cache-Control"
},
"ContentDisposition": {
"location": "header",
"locationName": "Content-Disposition"
},
"ContentEncoding": {
"location": "header",
"locationName": "Content-Encoding"
},
"ContentLanguage": {
"location": "header",
"locationName": "Content-Language"
},
"ContentRange": {
"location": "header",
"locationName": "Content-Range"
},
"ContentType": {
"location": "header",
"locationName": "Content-Type"
},
"ContentHash": {
"location": "header",
"locationName": "x-goog-hash"
},
"Expires": {
"location": "header",
"locationName": "Expires",
"type": "timestamp"
},
"WebsiteRedirectLocation": {
"location": "header",
"locationName": "x-goog-website-redirect-location"
},
"ServerSideEncryption": {
"location": "header",
"locationName": "x-goog-server-side-encryption"
},
"Metadata": {
"shape": "MetadataShape",
"location": "headers",
"locationName": "x-goog-meta-"
},
"StorageClass": {
"location": "header",
"locationName": "x-goog-storage-class"
}
},
"payload": "Body"
}
},
"DeleteObject": {
"http": {
"method": "DELETE",
"requestUri": "/{Bucket}/{Key+}"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"Key"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"Key": {
"location": "uri",
"locationName": "Key"
},
"VersionId": {
"location": "querystring",
"locationName": "generation"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"VersionId": {
"location": "header",
"locationName": "x-goog-generation"
}
}
}
},
"ComposeObject": {
"http": {
"method": "PUT",
"requestUri": "/{Bucket}/{Key+}?compose"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"Key"
],
"members": {
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"Source": {
"location": "header",
"locationName": "x-goog-copy-source"
},
"Key": {
"location": "uri",
"locationName": "Key"
},
"MetadataDirective": {
"location": "header",
"locationName": "x-goog-metadata-directive"
},
"ContentDisposition": {
"location": "header",
"locationName": "Content-Disposition"
},
"Content-Encoding": {
"location": "header",
"locationName": "Content-Encoding"
},
"MultipartUpload": {
"locationName": "ComposeRequest",
"type": "structure",
"members": {
"Parts": {
"locationName": "Component",
"type": "list",
"member": {
"type": "structure",
"members": {
"PartName": {
"locationName": "Name"
}
}
},
"flattened": true
}
}
},
"Metadata": {
"shape": "MetadataShape",
"location": "headers",
"locationName": "x-goog-meta-"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
},
"payload": "MultipartUpload"
},
"output": {
"type": "structure",
"members": {
"Expiration": {
"location": "header",
"locationName": "x-goog-expiration"
},
"ETag": {},
"VersioId": {
"location": "header",
"locationName": "x-goog-generation"
},
"MetaVersionId": {
"location": "header",
"locationName": "x-goog-metageneration"
}
}
}
},
"CopyObject": {
"http": {
"method": "PUT",
"requestUri": "/{Bucket}/{Key+}"
},
"input": {
"type": "structure",
"required": [
"Bucket",
"CopySource",
"Key"
],
"members": {
"ACL": {
"location": "header",
"locationName": "x-goog-acl"
},
"Bucket": {
"location": "uri",
"locationName": "Bucket"
},
"CacheControl": {
"location": "header",
"locationName": "Cache-Control"
},
"ContentDisposition": {
"location": "header",
"locationName": "Content-Disposition"
},
"ContentEncoding": {
"location": "header",
"locationName": "Content-Encoding"
},
"ContentLanguage": {
"location": "header",
"locationName": "Content-Language"
},
"ContentType": {
"location": "header",
"locationName": "Content-Type"
},
"CopySource": {
"location": "header",
"locationName": "x-goog-copy-source"
},
"CopySourceIfMatch": {
"location": "header",
"locationName": "x-goog-copy-source-if-match"
},
"CopySourceIfModifiedSince": {
"location": "header",
"locationName": "x-goog-copy-source-if-modified-since"
},
"CopySourceIfNoneMatch": {
"location": "header",
"locationName": "x-goog-copy-source-if-none-match"
},
"CopySourceIfUnmodifiedSince": {
"location": "header",
"locationName": "x-goog-copy-source-if-unmodified-since",
"type": "timestamp"
},
"Expires": {
"location": "header",
"locationName": "Expires",
"type": "timestamp"
},
"Key": {
"location": "uri",
"locationName": "Key"
},
"Metadata": {
"shape": "MetadataShape",
"location": "headers",
"locationName": "x-goog-meta-"
},
"MetadataDirective": {
"location": "header",
"locationName": "x-goog-metadata-directive"
},
"ProjectId": {
"location": "header",
"locationName": "x-goog-project-id"
}
}
},
"output": {
"type": "structure",
"members": {
"CopyObjectResult": {
"type": "structure",
"members": {
"ETag": {},
"LastModified": {
"type": "timestamp"
}
}
},
"Expiration": {
"location": "header",
"locationName": "x-goog-expiration"
},
"ContentHash": {
"location": "header",
"locationName": "x-goog-hash"
},
"VersionId": {
"location": "header",
"locationName": "x-goog-generation"
},
"MetaVersionId": {
"location": "header",
"locationName": "x-goog-metageneration"
}
},
"payload": "CopyObjectResult"
}
}
},
"shapes": {
"MetadataShape": {
"type": "map",
"key": {},
"value": {}
},
"OwnerShape": {
"locationName": "Owner",
"type": "structure",
"members": {
"ID": {},
"Name": {}
}
},
"ContentsShape": {
"type": "list",
"member": {
"type": "structure",
"members": {
"Key": {},
"LastModified": {
"type": "timestamp"
},
"ETag": {},
"Size": {
"type": "integer"
},
"StorageClass": {},
"Owner": {
"shape": "OwnerShape"
},
"VersionId": {
"locationName": "Generation"
}
}
},
"flattened": true
},
"CommonPrefixShape": {
"type": "list",
"member": {
"type": "structure",
"members": {
"Prefix": {}
}
},
"flattened": true
}
}
}

View File

@ -1,5 +0,0 @@
module.exports = {
GCP: require('./GcpService'),
GcpSigner: require('./GcpSigner'),
GcpUtils: require('./GcpUtils'),
};

View File

@ -1,302 +0,0 @@
const async = require('async');
const { errors, s3middleware } = require('arsenal');
const MD5Sum = s3middleware.MD5Sum;
const { GCP, GcpUtils } = require('./GCP');
const { createMpuKey } = GcpUtils;
const AwsClient = require('./AwsClient');
const { prepareStream } = require('../../api/apiUtils/object/prepareStream');
const { logHelper, removeQuotes } = require('./utils');
const { config } = require('../../Config');
const missingVerIdInternalError = errors.InternalError.customizeDescription(
'Invalid state. Please ensure versioning is enabled ' +
'in GCP for the location constraint and try again.');
/**
* Class representing a Google Cloud Storage backend object
* @extends AwsClient
*/
class GcpClient extends AwsClient {
/**
* constructor - creates a Gcp backend client object (inherits )
* @param {object} config - configuration object for Gcp Backend up
* @param {object} config.s3params - S3 configuration
* @param {string} config.bucketName - GCP bucket name
* @param {string} config.mpuBucket - GCP mpu bucket name
* @param {boolean} config.bucketMatch - bucket match flag
* @param {object} config.authParams - GCP service credentials
* @param {string} config.dataStoreName - locationConstraint name
* @param {booblean} config.serverSideEncryption - server side encryption
* flag
* @return {object} - returns a GcpClient object
*/
constructor(config) {
super(config);
this.clientType = 'gcp';
this.type = 'GCP';
this._gcpBucketName = config.bucketName;
this._mpuBucketName = config.mpuBucket;
this._createGcpKey = this._createAwsKey;
this._gcpParams = Object.assign(this._s3Params, {
mainBucket: this._gcpBucketName,
mpuBucket: this._mpuBucketName,
});
this._client = new GCP(this._gcpParams);
// reassign inherited list parts method from AWS to trigger
// listing using S3 metadata part list instead of request to GCP
this.listParts = undefined;
}
toObjectGetInfo(objectKey, bucketName) {
return {
key: this._createGcpKey(bucketName, objectKey, this._bucketMatch),
dataStoreName: this._dataStoreName,
};
}
/**
* healthcheck - the gcp health requires checking multiple buckets:
* main and mpu buckets
* @param {string} location - location name
* @param {function} callback - callback function to call with the bucket
* statuses
* @return {undefined}
*/
healthcheck(location, callback) {
const checkBucketHealth = (bucket, cb) => {
let bucketResp;
this._client.headBucket({ Bucket: bucket }, err => {
if (err) {
bucketResp = {
gcpBucket: bucket,
error: err };
} else {
bucketResp = {
gcpBucket: bucket,
message: 'Congrats! You own the bucket',
};
}
return cb(null, bucketResp);
});
};
const gcpResp = {};
async.parallel({
main: done => checkBucketHealth(this._gcpBucketName, done),
mpu: done => checkBucketHealth(this._mpuBucketName, done),
}, (err, result) => {
if (err) {
return callback(errors.InternalFailure
.customizeDescription('Unable to perform health check'));
}
gcpResp[location] = result.main.error || result.mpu.error ?
{ error: true, external: true } : {};
Object.assign(gcpResp[location], result);
return callback(null, gcpResp);
});
}
createMPU(key, metaHeaders, bucketName, websiteRedirectHeader, contentType,
cacheControl, contentDisposition, contentEncoding, log, callback) {
const metaHeadersTrimmed = {};
Object.keys(metaHeaders).forEach(header => {
if (header.startsWith('x-amz-meta-')) {
const headerKey = header.substring(11);
metaHeadersTrimmed[headerKey] = metaHeaders[header];
}
});
const gcpKey = this._createGcpKey(bucketName, key, this._bucketMatch);
const params = {
Bucket: this._mpuBucketName,
Key: gcpKey,
Metadata: metaHeadersTrimmed,
ContentType: contentType,
CacheControl: cacheControl,
ContentDisposition: contentDisposition,
ContentEncoding: contentEncoding,
};
return this._client.createMultipartUpload(params, (err, mpuResObj) => {
if (err) {
logHelper(log, 'error', 'err from data backend',
err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`GCP: ${err.message}`)
);
}
return callback(null, mpuResObj);
});
}
completeMPU(jsonList, mdInfo, key, uploadId, bucketName, log, callback) {
const gcpKey = this._createGcpKey(bucketName, key, this._bucketMatch);
const partArray = [];
const partList = jsonList.Part;
for (let i = 0; i < partList.length; ++i) {
const partObj = partList[i];
if (!partObj.PartNumber || !partObj.ETag) {
return callback(errors.MalformedXML);
}
const number = partObj.PartNumber[0];
// check if the partNumber is an actual number throw an error
// otherwise
if (isNaN(number)) {
return callback(errors.MalformedXML);
}
const partNumber = parseInt(number, 10);
const partParams = {
PartName: createMpuKey(gcpKey, uploadId, partNumber),
PartNumber: partNumber,
ETag: partObj.ETag[0],
};
partArray.push(partParams);
}
const mpuParams = {
Bucket: this._gcpBucketName,
MPU: this._mpuBucketName,
Key: gcpKey,
UploadId: uploadId,
MultipartUpload: { Parts: partArray },
};
const completeObjData = { key: gcpKey };
return this._client.completeMultipartUpload(mpuParams,
(err, completeMpuRes) => {
if (err) {
logHelper(log, 'error', 'err from data backend on ' +
'completeMPU', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`GCP: ${err.message}`)
);
}
if (!completeMpuRes.VersionId) {
logHelper(log, 'error', 'missing version id for data ' +
'backend object', missingVerIdInternalError,
this._dataStoreName, this.clientType);
return callback(missingVerIdInternalError);
}
// remove quotes from eTag because they're added later
completeObjData.eTag = removeQuotes(completeMpuRes.ETag);
completeObjData.dataStoreVersionId = completeMpuRes.VersionId;
completeObjData.contentLength =
Number.parseInt(completeMpuRes.ContentLength, 10);
return callback(null, completeObjData);
});
}
uploadPart(request, streamingV4Params, stream, size, key, uploadId,
partNumber, bucketName, log, callback) {
let hashedStream = stream;
if (request) {
const partStream = prepareStream(request, streamingV4Params,
log, callback);
hashedStream = new MD5Sum();
partStream.pipe(hashedStream);
}
const gcpKey = this._createGcpKey(bucketName, key, this._bucketMatch);
const params = {
Bucket: this._mpuBucketName,
Key: gcpKey,
UploadId: uploadId,
Body: hashedStream,
ContentLength: size,
PartNumber: partNumber };
return this._client.uploadPart(params, (err, partResObj) => {
if (err) {
logHelper(log, 'error', 'err from data backend ' +
'on uploadPart', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`GCP: ${err.message}`)
);
}
// remove quotes from eTag because they're added later
const noQuotesETag = removeQuotes(partResObj.ETag);
const dataRetrievalInfo = {
key: gcpKey,
dataStoreType: 'gcp',
dataStoreName: this._dataStoreName,
dataStoreETag: noQuotesETag,
};
return callback(null, dataRetrievalInfo);
});
}
uploadPartCopy(request, gcpSourceKey, sourceLocationConstraintName, log,
callback) {
const destBucketName = request.bucketName;
const destObjectKey = request.objectKey;
const destGcpKey = this._createGcpKey(destBucketName, destObjectKey,
this._bucketMatch);
const sourceGcpBucketName =
config.getGcpBucketNames(sourceLocationConstraintName).bucketName;
const uploadId = request.query.uploadId;
const partNumber = request.query.partNumber;
const copySourceRange = request.headers['x-amz-copy-source-range'];
if (copySourceRange) {
return callback(errors.NotImplemented
.customizeDescription('Error returned from ' +
`${this.clientType}: copySourceRange not implemented`)
);
}
const params = {
Bucket: this._mpuBucketName,
CopySource: `${sourceGcpBucketName}/${gcpSourceKey}`,
Key: destGcpKey,
UploadId: uploadId,
PartNumber: partNumber,
};
return this._client.uploadPartCopy(params, (err, res) => {
if (err) {
if (err.code === 'AccesssDenied') {
logHelper(log, 'error', 'Unable to access ' +
`${sourceGcpBucketName} GCP bucket`, err,
this._dataStoreName, this.clientType);
return callback(errors.AccessDenied
.customizeDescription('Error: Unable to access ' +
`${sourceGcpBucketName} GCP bucket`)
);
}
logHelper(log, 'error', 'error from data backend on ' +
'uploadPartCopy', err, this._dataStoreName);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`GCP: ${err.message}`)
);
}
// remove quotes from eTag because they're added later
const eTag = removeQuotes(res.CopyObjectResult.ETag);
return callback(null, eTag);
});
}
abortMPU(key, uploadId, bucketName, log, callback) {
const gcpKey = this._createGcpKey(bucketName, key, this._bucketMatch);
const getParams = {
Bucket: this._gcpBucketName,
MPU: this._mpuBucketName,
Key: gcpKey,
UploadId: uploadId,
};
return this._client.abortMultipartUpload(getParams, err => {
if (err) {
logHelper(log, 'error', 'err from data backend ' +
'on abortMPU', err, this._dataStoreName, this.clientType);
return callback(errors.ServiceUnavailable
.customizeDescription('Error returned from ' +
`GCP: ${err.message}`)
);
}
return callback();
});
}
}
module.exports = GcpClient;

View File

@ -1,149 +0,0 @@
const arsenal = require('arsenal');
const errors = arsenal.errors;
const createLogger = require('../multipleBackendLogger');
const { logHelper } = require('./utils');
class PfsClient {
constructor(config) {
const { host, port } = config.endpoint;
this.clientType = 'pfs';
this._bucketMatch = config.bucketMatch;
this._dataStoreName = config.dataStoreName;
this._restClient = new arsenal.network.rest.RESTClient({
host,
port,
isPassthrough: true,
});
}
setup(cb) {
return cb();
}
_createFsKey(requestBucketName, requestObjectKey, bucketMatch) {
if (bucketMatch) {
return requestObjectKey;
}
return `${requestBucketName}/${requestObjectKey}`;
}
toObjectGetInfo(objectKey, bucketName) {
return {
key: this._createFsKey(bucketName, objectKey, this._bucketMatch),
dataStoreName: this._dataStoreName,
};
}
put(stream, size, keyContext, reqUids, callback) {
const log = createLogger(reqUids);
if (keyContext.metaHeaders['x-amz-meta-mdonly'] === 'true') {
const b64 = keyContext.metaHeaders['x-amz-meta-md5chksum'];
let md5 = null;
if (b64) {
md5 = new Buffer(b64, 'base64').toString('hex');
}
return callback(null, keyContext.objectKey, '',
keyContext.metaHeaders['x-amz-meta-size'],
md5
);
}
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
get(objectGetInfo, range, reqUids, callback) {
const log = createLogger(reqUids);
this._restClient.get(objectGetInfo.key, range, reqUids, (err, rs) => {
if (err) {
logHelper(log, 'error', 'err from data backend', err,
this._dataStoreName, this.clientType);
return callback(err, null);
}
return callback(null, rs);
});
}
delete(objectGetInfo, reqUids, callback) {
const log = createLogger(reqUids);
const key = typeof objectGetInfo === 'string' ? objectGetInfo :
objectGetInfo.key;
this._restClient.delete(key, reqUids, err => {
if (err) {
logHelper(log, 'error', 'err from data backend', err,
this._dataStoreName, this.clientType);
return callback(err);
}
return callback();
});
}
// TODO: Implement a healthcheck
healthcheck(location, callback) {
const fsResp = {};
return callback(null, fsResp);
}
createMPU(key, metaHeaders, bucketName, websiteRedirectHeader, contentType,
cacheControl, contentDisposition, contentEncoding, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
uploadPart(request, streamingV4Params, stream, size, key, uploadId,
partNumber, bucketName, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
listParts(key, uploadId, bucketName, partNumberMarker, maxParts, log,
callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
completeMPU(jsonList, mdInfo, key, uploadId, bucketName, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
abortMPU(key, uploadId, bucketName, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
objectPutTagging(key, bucket, objectMD, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
objectDeleteTagging(key, bucket, objectMD, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
copyObject(request, destLocationConstraintName, sourceKey,
sourceLocationConstraintName, storeMetadataParams, log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
uploadPartCopy(request, awsSourceKey, sourceLocationConstraintName,
log, callback) {
logHelper(log, 'error', 'Not implemented', errors.NotImplemented,
this._dataStoreName, this.clientType);
return callback(errors.NotImplemented);
}
}
module.exports = PfsClient;

View File

@ -1,164 +0,0 @@
const async = require('async');
const constants = require('../../../constants');
const { config } = require('../../../lib/Config');
/* eslint-disable camelcase */
const backendHealth = {
aws_s3: {
response: undefined,
time: 0,
},
azure: {
response: undefined,
time: 0,
},
gcp: {
reponse: undefined,
time: 0,
},
};
/* eslint-enable camelcase */
const utils = {
logHelper(log, level, description, error, dataStoreName, backendType) {
const { message, name, requestId, extendedRequestId } = error;
log[level](description, {
error: message,
errorName: name,
dataStoreName,
backendType,
extRequestId: requestId,
extExtendedRequestId: extendedRequestId,
});
},
// take off the 'x-amz-meta-'
trimXMetaPrefix(obj) {
const newObj = {};
const metaObj = obj || {};
Object.keys(metaObj).forEach(key => {
const newKey = key.substring(11);
newObj[newKey] = metaObj[key];
});
return newObj;
},
removeQuotes(word) {
return word.slice(1, -1);
},
skipMpuPartProcessing(completeObjData) {
const backendType = completeObjData.dataStoreType;
if (constants.mpuMDStoredExternallyBackend[backendType]) {
return true;
}
return false;
},
/**
* checkAzureBackendMatch - checks that the external backend location for
* two data objects is the same and is Azure
* @param {array} objectDataOne - data of first object to compare
* @param {object} objectDataTwo - data of second object to compare
* @return {boolean} - true if both data backends are Azure, false if not
*/
checkAzureBackendMatch(objectDataOne, objectDataTwo) {
if (objectDataOne.dataStoreType === 'azure' &&
objectDataTwo.dataStoreType === 'azure') {
return true;
}
return false;
},
/**
* externalBackendCopy - Server side copy should only be allowed:
* 1) if source object and destination object are both on aws, both
* on azure, or both on gcp
* 2) if azure to azure, must be the same storage account since Azure
* copy outside of an account is async
* 3) if the source bucket is not an encrypted bucket and the
* destination bucket is not an encrypted bucket (unless the copy
* is all within the same bucket).
* @param {string} locationConstraintSrc - location constraint of the source
* @param {string} locationConstraintDest - location constraint of the
* destination
* @param {object} sourceBucketMD - metadata of the source bucket
* @param {object} destBucketMD - metadata of the destination bucket
* @return {boolean} - true if copying object from one
* externalbackend to the same external backend and for Azure if it is the
* same account since Azure copy outside of an account is async
*/
externalBackendCopy(locationConstraintSrc, locationConstraintDest,
sourceBucketMD, destBucketMD) {
const sourceBucketName = sourceBucketMD.getName();
const destBucketName = destBucketMD.getName();
const isSameBucket = sourceBucketName === destBucketName;
const bucketsNotEncrypted = destBucketMD.getServerSideEncryption()
=== sourceBucketMD.getServerSideEncryption() &&
sourceBucketMD.getServerSideEncryption() === null;
const sourceLocationConstraintType =
config.getLocationConstraintType(locationConstraintSrc);
const locationTypeMatch =
config.getLocationConstraintType(locationConstraintSrc) ===
config.getLocationConstraintType(locationConstraintDest);
return locationTypeMatch && (isSameBucket || bucketsNotEncrypted) &&
(sourceLocationConstraintType === 'aws_s3' ||
sourceLocationConstraintType === 'gcp' ||
(sourceLocationConstraintType === 'azure' &&
config.isSameAzureAccount(locationConstraintSrc,
locationConstraintDest)));
},
checkExternalBackend(clients, locations, type, flightCheckOnStartUp,
externalBackendHealthCheckInterval, cb) {
const checkStatus = backendHealth[type] || {};
if (locations.length === 0) {
return process.nextTick(cb, null, []);
}
if (!flightCheckOnStartUp && checkStatus.response &&
Date.now() - checkStatus.time
< externalBackendHealthCheckInterval) {
return process.nextTick(cb, null, checkStatus.response);
}
let locationsToCheck;
if (flightCheckOnStartUp) {
// check all locations if flight check on start up
locationsToCheck = locations;
} else {
const randomLocation = locations[Math.floor(Math.random() *
locations.length)];
locationsToCheck = [randomLocation];
}
return async.mapLimit(locationsToCheck, 5, (location, next) => {
const client = clients[location];
client.healthcheck(location, next, flightCheckOnStartUp);
}, (err, results) => {
if (err) {
return cb(err);
}
if (!flightCheckOnStartUp) {
checkStatus.response = results;
checkStatus.time = Date.now();
}
return cb(null, results);
});
},
translateAzureMetaHeaders(metaHeaders, tags) {
const translatedMetaHeaders = {};
if (tags) {
// tags are passed as string of format 'key1=value1&key2=value2'
const tagObj = {};
const tagArr = tags.split('&');
tagArr.forEach(keypair => {
const equalIndex = keypair.indexOf('=');
const key = keypair.substring(0, equalIndex);
tagObj[key] = keypair.substring(equalIndex + 1);
});
translatedMetaHeaders.tags = JSON.stringify(tagObj);
}
Object.keys(metaHeaders).forEach(headerName => {
const translated = headerName.substring(11).replace(/-/g, '_');
translatedMetaHeaders[translated] = metaHeaders[headerName];
});
return translatedMetaHeaders;
},
};
module.exports = utils;

View File

@ -3,16 +3,16 @@ const http = require('http');
const url = require('url');
const AWS = require('aws-sdk');
const Sproxy = require('sproxydclient');
const externalBackends = require('arsenal').storage.data.external;
const HttpsProxyAgent = require('https-proxy-agent');
const DataFileBackend = require('./file/backend');
const inMemory = require('./in_memory/backend').backend;
const AwsClient = require('./external/AwsClient');
const GcpClient = require('./external/GcpClient');
const AzureClient = require('./external/AzureClient');
const PfsClient = require('./external/PfsClient');
const proxyCompareUrl = require('./proxyCompareUrl');
const AwsClient = externalBackends.AwsClient;
const GcpClient = externalBackends.GcpClient;
const AzureClient = externalBackends.AzureClient;
const PfsClient = externalBackends.PfsClient;
const { proxyCompareUrl } = externalBackends.backendUtils;
const constants = require('../../constants');
const { config } = require('../Config');

View File

@ -1,17 +1,17 @@
const { errors, s3middleware } = require('arsenal');
const { errors, s3middleware, storage } = require('arsenal');
const { parseTagFromQuery } = s3middleware.tagging;
const createLogger = require('./multipleBackendLogger');
const async = require('async');
const { config } = require('../Config');
const parseLC = require('./locationConstraintParser');
const DataFileBackend = require('./file/backend');
const { checkExternalBackend } = require('./external/utils');
const metadata = require('../metadata/wrapper');
const { externalBackendHealthCheckInterval } = require('../../constants');
const locationStorageCheck =
require('../api/apiUtils/object/locationStorageCheck');
const { createLogger, checkExternalBackend } =
storage.data.external.backendUtils;
let clients = parseLC(config);
config.on('location-constraints-update', () => {
@ -78,7 +78,8 @@ const multipleBackendGateway = {
dataStoreMD5,
};
return callback(null, dataRetrievalInfo);
});
// send metadata as param for AzureClient in Arsenal
}, metadata);
},
head: (objectGetInfoArr, reqUids, callback) => {
@ -204,7 +205,7 @@ const multipleBackendGateway = {
return cb(err);
}
return client.uploadPart(request, streamingV4Params, stream,
size, key, uploadId, partNumber, bucketName, log,
size, key, uploadId, partNumber, bucketName, config, log,
(err, partInfo) => {
if (err) {
// if error putting part, counter should be decremented
@ -330,7 +331,7 @@ const multipleBackendGateway = {
const client = clients[location];
if (client.uploadPartCopy) {
return client.uploadPartCopy(request, awsSourceKey,
sourceLocationConstraintName,
sourceLocationConstraintName, config,
log, cb);
}
return cb(errors.NotImplemented.customizeDescription(

View File

@ -1,11 +0,0 @@
const werelogs = require('werelogs');
const logger = new werelogs.Logger('MultipleBackendGateway');
function createLogger(reqUids) {
return reqUids ?
logger.newRequestLoggerFromSerializedUids(reqUids) :
logger.newRequestLogger();
}
module.exports = createLogger;

View File

@ -1,53 +0,0 @@
/**
* proxyCompareUrl - compares request endpoint to urls in NO_PROXY env var
* @param {string} endpoint - url of request
* @return {bool} true if request endpoint matches no proxy, false if not
*/
function proxyCompareUrl(endpoint) {
const noProxy = process.env.NO_PROXY || process.env.no_proxy;
if (!noProxy) {
return false;
}
// noProxy env var is a comma separated list of urls not to proxy
const noProxyList = noProxy.split(',');
if (noProxyList.includes(endpoint)) {
return true;
}
const epArr = endpoint.split('.');
// reverse array to make comparison easier
epArr.reverse();
let match = false;
for (let j = 0; j < noProxyList.length; j++) {
const urlArr = noProxyList[j].split('.');
urlArr.reverse();
for (let i = 0; i < epArr.length; i++) {
if (epArr[i] === urlArr[i]) {
match = true;
} else if (urlArr[i] === '*' && i === (urlArr.length - 1)) {
// if first character of url is '*', remaining endpoint matches
match = true;
break;
} else if (urlArr[i] === '' && i === (urlArr.length - 1)) {
// if first character of url is '.', it is treated as wildcard
match = true;
break;
} else if (urlArr[i] === '*') {
match = true;
} else if (epArr[i] !== urlArr[i]) {
match = false;
break;
}
}
// if endpoint matches noProxy element, stop checking
if (match) {
break;
}
}
// if endpoint matches, request should not be proxied
if (match) {
return true;
}
return false;
}
module.exports = proxyCompareUrl;

View File

@ -1,22 +1,23 @@
const async = require('async');
const { errors, s3middleware } = require('arsenal');
const { errors, s3middleware, storage } = require('arsenal');
const PassThrough = require('stream').PassThrough;
const assert = require('assert');
const DataFileInterface = require('./file/backend');
const inMemory = require('./in_memory/backend').backend;
const multipleBackendGateway = require('./multipleBackendGateway');
const utils = require('./external/utils');
const { config } = require('../Config');
const MD5Sum = s3middleware.MD5Sum;
const NullStream = s3middleware.NullStream;
const assert = require('assert');
const kms = require('../kms/wrapper');
const externalBackends = require('../../constants').externalBackends;
const constants = require('../../constants');
const { BackendInfo } = require('../api/apiUtils/object/BackendInfo');
const locationStorageCheck =
require('../api/apiUtils/object/locationStorageCheck');
const RelayMD5Sum = require('../utilities/RelayMD5Sum');
const externalBackends = constants.externalBackends;
const MD5Sum = s3middleware.MD5Sum;
const NullStream = s3middleware.NullStream;
const { backendUtils } = storage.data.external;
const skipError = new Error('skip');
let CdmiData;
@ -478,7 +479,7 @@ const data = {
cb) => {
const serverSideEncryption = destBucketMD.getServerSideEncryption();
if (config.backends.data === 'multiple' &&
utils.externalBackendCopy(sourceLocationConstraintName,
backendUtils.externalBackendCopy(config, sourceLocationConstraintName,
storeMetadataParams.dataStoreName, sourceBucketMD, destBucketMD)) {
const destLocationConstraintName =
storeMetadataParams.dataStoreName;
@ -486,7 +487,7 @@ const data = {
const externalSourceKey = objectGetInfo.key;
return client.copyObject(request, destLocationConstraintName,
externalSourceKey, sourceLocationConstraintName,
storeMetadataParams, log, (error, objectRetrievalInfo) => {
storeMetadataParams, config, log, (error, objectRetrievalInfo) => {
if (error) {
return cb(error);
}

View File

@ -19,11 +19,10 @@
},
"homepage": "https://github.com/scality/S3#readme",
"dependencies": {
"arsenal": "github:scality/arsenal#84bf7bd",
"arsenal": "github:scality/arsenal#d7ca4c17",
"async": "~2.5.0",
"aws-sdk": "2.28.0",
"azure-storage": "^2.1.0",
"backo": "^1.1.0",
"bucketclient": "scality/bucketclient#5aa99d7",
"bufferutil": "^3.0.5",
"commander": "^2.9.0",

View File

@ -1,12 +1,12 @@
const async = require('async');
const assert = require('assert');
const arsenal = require('arsenal');
const withV4 = require('../../support/withV4');
const BucketUtility = require('../../../lib/utility/bucket-util');
const { describeSkipIfNotMultipleOrCeph, gcpClient, gcpBucketMPU, gcpLocation,
genUniqID } = require('../utils');
const { createMpuKey } =
require('../../../../../../lib/data/external/GCP').GcpUtils;
const { createMpuKey } = arsenal.storage.data.external.GcpUtils;
const bucket = `initmpugcp${genUniqID()}`;
const keyName = `somekey-${genUniqID()}`;

View File

@ -1,13 +1,13 @@
const assert = require('assert');
const async = require('async');
const arsenal = require('arsenal');
const withV4 = require('../../support/withV4');
const BucketUtility = require('../../../lib/utility/bucket-util');
const { describeSkipIfNotMultipleOrCeph, gcpClient, gcpBucket, gcpBucketMPU,
gcpLocation, gcpLocationMismatch, uniqName, genUniqID }
= require('../utils');
const { createMpuKey } =
require('../../../../../../lib/data/external/GCP').GcpUtils;
const { createMpuKey } = arsenal.storage.data.external.GcpUtils;
const keyObject = 'putgcp';
const bucket = `putpartgcp${genUniqID()}`;

View File

@ -108,7 +108,7 @@ callback) {
describeSkipIfNotMultiple('MultipleBackend object copy: AWS',
function testSuite() {
this.timeout(250000);
this.timeout(300000);
withV4(sigCfg => {
beforeEach(() => {
bucketUtil = new BucketUtility('default', sigCfg);
@ -142,8 +142,14 @@ function testSuite() {
afterEach(() => {
process.stdout.write('Emptying bucket\n');
return bucketUtil.empty(bucket)
.then(() => bucketUtil.empty(bucketAws))
.then(() => bucketUtil.empty(awsServerSideEncryptionbucket))
.then(() => {
process.stdout.write('Emptying aws bucket\n');
bucketUtil.empty(bucketAws);
})
.then(() => {
process.stdout.write('Empty aws encryption bucket\n');
bucketUtil.empty(awsServerSideEncryptionbucket);
})
.then(() => {
process.stdout.write(`Deleting bucket ${bucket}\n`);
return bucketUtil.deleteOne(bucket);

View File

@ -1,13 +1,13 @@
const assert = require('assert');
const crypto = require('crypto');
const { errors } = require('arsenal');
const { errors, storage } = require('arsenal');
const AWS = require('aws-sdk');
const uuid = require('uuid/v4');
const async = require('async');
const azure = require('azure-storage');
const { GCP } = require('../../../../../lib/data/external/GCP');
const { GCP } = storage.data.external;
const { getRealAwsConfig } = require('../support/awsConfig');
const { config } = require('../../../../../lib/Config');

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =
require('../../../../aws-node-sdk/test/support/awsConfig');

View File

@ -1,7 +1,8 @@
const assert = require('assert');
const async = require('async');
const arsenal = require('arsenal');
const xml2js = require('xml2js');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP, GcpUtils } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP, GcpUtils } = arsenal.storage.data.external;
const { gcpRequestRetry, setBucketClass, gcpMpuSetup, genUniqID } =
require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { gcpRequestRetry, setBucketClass, gcpMpuSetup, genUniqID } =
require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genDelTagObj, genUniqID } =
require('../../../utils/gcpUtils');

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genGetTagObj, genUniqID } =
require('../../../utils/gcpUtils');

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, setBucketClass, genUniqID } =
require('../../../utils/gcpUtils');

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genUniqID } = require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { makeGcpRequest } = require('../../../utils/makeRequest');
const { gcpRequestRetry, genPutTagObj, genUniqID } =
require('../../../utils/gcpUtils');

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const { GCP } = require('../../../../../../lib/data/external/GCP');
const arsenal = require('arsenal');
const { GCP } = arsenal.storage.data.external;
const { gcpRequestRetry, setBucketClass, genUniqID } =
require('../../../utils/gcpUtils');
const { getRealAwsConfig } =

View File

@ -1,11 +1,11 @@
const { auth } = require('arsenal');
const { auth, storage } = require('arsenal');
const http = require('http');
const https = require('https');
const querystring = require('querystring');
const conf = require('../../../../lib/Config').config;
const { GcpSigner } = require('../../../../lib/data/external/GCP');
const { GcpSigner } = storage.data.external;
const transport = conf.https ? https : http;
const ipAddress = process.env.IP ? process.env.IP : '127.0.0.1';

View File

@ -1,106 +0,0 @@
const assert = require('assert');
const AwsClient = require('../../../lib/data/external/AwsClient');
const GcpClient = require('../../../lib/data/external/GcpClient');
const AzureClient = require('../../../lib/data/external/AzureClient');
const DummyService = require('../DummyService');
const { DummyRequestLogger } = require('../helpers');
const backendClients = [
{
Class: AwsClient,
name: 'AwsClient',
config: {
s3Params: {},
bucketName: 'awsTestBucketName',
dataStoreName: 'awsDataStore',
serverSideEncryption: false,
type: 'aws',
},
},
{
Class: GcpClient,
name: 'GcpClient',
config: {
s3Params: {},
bucketName: 'gcpTestBucketName',
mpuBucket: 'gcpTestMpuBucketName',
dataStoreName: 'gcpDataStore',
type: 'gcp',
},
},
{
Class: AzureClient,
name: 'AzureClient',
config: {
azureStorageEndpoint: '',
azureStorageCredentials: {
storageAccountName: 'scality',
storageAccessKey: 'Zm9vCg==',
},
azureContainerName: 'azureTestBucketName',
dataStoreName: 'azureDataStore',
type: 'azure',
},
},
];
const log = new DummyRequestLogger();
describe('external backend clients', () => {
backendClients.forEach(backend => {
let testClient;
before(() => {
testClient = new backend.Class(backend.config);
testClient._client = new DummyService({ versioning: true });
});
if (backend.config.type !== 'azure') {
it(`${backend.name} completeMPU should return correctly ` +
'typed mpu results', done => {
const jsonList = {
Part: [
{
PartNumber: [1],
ETag: ['testpart0001etag'],
},
{
PartNumber: [2],
ETag: ['testpart0002etag'],
},
{
PartNumber: [3],
ETag: ['testpart0003etag'],
},
],
};
const key = 'externalBackendTestKey';
const bucketName = 'externalBackendTestBucket';
const uploadId = 'externalBackendTestUploadId';
testClient.completeMPU(jsonList, null, key,
uploadId, bucketName, log, (err, res) => {
assert.strictEqual(typeof res.key, 'string');
assert.strictEqual(typeof res.eTag, 'string');
assert.strictEqual(typeof res.dataStoreVersionId,
'string');
assert.strictEqual(typeof res.contentLength, 'number');
return done();
});
});
}
it(`${backend.name} toObjectGetInfo should return correct ` +
'objectGetInfo object', () => {
const key = 'externalBackendTestKey';
const bucketName = 'externalBackendTestBucket';
const objectGetInfo = testClient.toObjectGetInfo(key, bucketName);
assert.deepStrictEqual(objectGetInfo, {
// bucketMatch === false => expect bucket name to be
// prefixed to the backend key
key: 'externalBackendTestBucket/externalBackendTestKey',
dataStoreName: backend.config.dataStoreName,
});
});
// To-Do: test the other external client methods
});
});

View File

@ -1,194 +0,0 @@
const assert = require('assert');
const http = require('http');
const { GCP } = require('../../../lib/data/external/GCP');
const httpPort = 8888;
// test values
const host = 'localhost:8888';
const Bucket = 'testrequestbucket';
const Key = 'testRequestKey';
const MultipartUpload = { Parts: [{ PartName: 'part' }] };
const CopySource = 'copyBucket/copyKey';
const accessKeyId = 'accesskey';
const secretAccessKey = 'secretaccesskey';
function handler(isPathStyle) {
return (req, res) => {
if (isPathStyle) {
assert(req.headers.host, host);
assert(req.url.includes(Bucket));
} else {
assert(req.headers.host, `${Bucket}.${host}`);
assert(!req.url.includes(Bucket));
}
res.end();
};
}
const invalidBucketNames = [
'..',
'.bucketname',
'bucketname.',
'bucketName.',
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
'256.256.256.256',
'',
];
function badBucketNameHandler(req, res) {
assert(req.headers.host, host);
const bucketFromUrl = req.url.split('/')[1];
assert.strictEqual(typeof bucketFromUrl, 'string');
assert(invalidBucketNames.includes(bucketFromUrl));
res.end();
}
const operations = [
{
op: 'headBucket',
params: { Bucket },
},
{
op: 'listObjects',
params: { Bucket },
},
{
op: 'listVersions',
params: { Bucket },
},
{
op: 'getBucketVersioning',
params: { Bucket },
},
{
op: 'headObject',
params: { Bucket, Key },
},
{
op: 'putObject',
params: { Bucket, Key },
},
{
op: 'getObject',
params: { Bucket, Key },
},
{
op: 'deleteObject',
params: { Bucket, Key },
},
{
op: 'composeObject',
params: { Bucket, Key, MultipartUpload },
},
{
op: 'copyObject',
params: { Bucket, Key, CopySource },
},
];
describe('GcpService request behavior', function testSuite() {
this.timeout(120000);
let httpServer;
let client;
before(done => {
client = new GCP({
endpoint: `http://${host}`,
maxRetries: 0,
s3ForcePathStyle: false,
accessKeyId,
secretAccessKey,
});
httpServer =
http.createServer(badBucketNameHandler).listen(httpPort);
httpServer.on('listening', done);
httpServer.on('error', err => {
process.stdout.write(`https server: ${err.stack}\n`);
process.exit(1);
});
});
after('Terminating Server', () => {
httpServer.close();
});
invalidBucketNames.forEach(bucket => {
it(`should not use dns-style if bucket isn't dns compatible: ${bucket}`,
done => {
client.headBucket({ Bucket: bucket }, err => {
assert.ifError(err);
done();
});
});
});
});
describe('GcpService pathStyle tests', function testSuite() {
this.timeout(120000);
let httpServer;
let client;
before(done => {
client = new GCP({
endpoint: `http://${host}`,
maxRetries: 0,
s3ForcePathStyle: true,
accessKeyId,
secretAccessKey,
});
httpServer =
http.createServer(handler(true)).listen(httpPort);
httpServer.on('listening', done);
httpServer.on('error', err => {
process.stdout.write(`https server: ${err.stack}\n`);
process.exit(1);
});
});
after('Terminating Server', () => {
httpServer.close();
});
operations.forEach(test => it(`GCP::${test.op}`, done => {
client[test.op](test.params, err => {
assert.ifError(err);
done();
});
}));
});
describe('GcpService dnsStyle tests', function testSuite() {
this.timeout(120000);
let httpServer;
let client;
before(done => {
client = new GCP({
endpoint: `http://localhost:${httpPort}`,
maxRetries: 0,
s3ForcePathStyle: false,
accessKeyId,
secretAccessKey,
});
httpServer =
http.createServer(handler(false)).listen(httpPort);
httpServer.on('listening', done);
httpServer.on('error', err => {
process.stdout.write(`https server: ${err.stack}\n`);
process.exit(1);
});
});
after('Terminating Server', () => {
httpServer.close();
});
operations.forEach(test => it(`GCP::${test.op}`, done => {
client[test.op](test.params, err => {
assert.ifError(err);
done();
});
}));
});

View File

@ -1,7 +1,8 @@
const assert = require('assert');
const { errors } = require('arsenal');
const { errors, storage } = require('arsenal');
const AwsClient = require('../../../lib/data/external/AwsClient');
const AwsClient = storage.data.external.AwsClient;
const { config } = require('../../../lib/Config');
const DummyService = require('../DummyService');
const { DummyRequestLogger } = require('../helpers');
@ -105,7 +106,8 @@ describe('AwsClient::copyObject', () => {
testClient._supportsVersioning = test.input.supportsVersioning;
testClient._client.versioning = test.input.enableMockVersioning;
testClient.copyObject(copyObjectRequest, null, key,
sourceLocationConstraint, null, log, err => test.callback(err, done));
sourceLocationConstraint, null, config, log,
err => test.callback(err, done));
}));
});

View File

@ -1,6 +1,6 @@
const assert = require('assert');
const AwsClient = require('arsenal').storage.data.external.AwsClient;
const parseLC = require('../../../lib/data/locationConstraintParser');
const AwsClient = require('../../../lib/data/external/AwsClient');
const inMemory = require('../../../lib/data/in_memory/backend').backend;
const DataFileInterface = require('../../../lib/data/file/backend');

View File

@ -1,7 +1,10 @@
const assert = require('assert');
const utils = require('../../../lib/data/external/utils');
const arsenal = require('arsenal');
const utils = arsenal.storage.data.external.backendUtils;
const BucketInfo = require('arsenal').models.BucketInfo;
const { config } = require('../../../lib/Config');
const userBucketOwner = 'Bart';
const creationDate = new Date().toJSON();
const serverSideEncryption = { cryptoScheme: 123, algorithm: 'algo',
@ -136,7 +139,7 @@ describe('Testing Config.js function: ', () => {
'and destination location constraint ===' +
` ${result.destLocationConstraintName} and ${result.description}`,
done => {
const isCopy = utils.externalBackendCopy(
const isCopy = utils.externalBackendCopy(config,
result.sourceLocationConstraintName,
result.destLocationConstraintName, result.sourceBucketMD,
result.destBucketMD);

View File

@ -1,65 +0,0 @@
const assert = require('assert');
const uuid = require('uuid/v4');
const { createMpuKey, createMpuList } =
require('../../../lib/data/external/GCP').GcpUtils;
const key = `somekey${Date.now()}`;
const uploadId = uuid().replace(/-/g, '');
const phase = 'createMpulist';
const size = 2;
const correctMpuList = [
{ PartName: `${key}-${uploadId}/${phase}/00001`, PartNumber: 1 },
{ PartName: `${key}-${uploadId}/${phase}/00002`, PartNumber: 2 },
];
describe('GcpUtils MPU Helper Functions:', () => {
describe('createMpuKey', () => {
const tests = [
{
it: 'if phase and part number are given',
input: { phase: 'test', partNumber: 1 },
output: `${key}-${uploadId}/test/00001`,
},
{
it: 'if only phase is given',
input: { phase: 'test' },
output: `${key}-${uploadId}/test`,
},
{
it: 'if part number is given',
input: { partNumber: 1 },
output: `${key}-${uploadId}/parts/00001`,
},
{
it: 'if phase and part number aren not given',
input: {},
output: `${key}-${uploadId}/`,
},
];
tests.forEach(test => {
it(test.it, () => {
const { partNumber, phase } = test.input;
assert.strictEqual(createMpuKey(
key, uploadId, partNumber, phase), test.output);
});
});
});
describe('createMpuList', () => {
const tests = [
{
it: 'should create valid mpu list',
input: { phase, size },
output: correctMpuList,
},
];
tests.forEach(test => {
it(test.it, () => {
const { phase, size } = test.input;
assert.deepStrictEqual(createMpuList(
{ Key: key, UploadId: uploadId }, phase, size),
test.output);
});
});
});
});

View File

@ -1,156 +0,0 @@
const assert = require('assert');
const { errors } = require('arsenal');
const { gcpTaggingPrefix } = require('../../../constants');
const { genPutTagObj } =
require('../../../tests/functional/raw-node/utils/gcpUtils');
const { processTagSet, stripTags, retrieveTags, getPutTagsMetadata } =
require('../../../lib/data/external/GCP').GcpUtils;
const maxTagSize = 10;
const validTagSet = genPutTagObj(2);
const validTagObj = {};
validTagObj[`${gcpTaggingPrefix}key0`] = 'Value0';
validTagObj[`${gcpTaggingPrefix}key1`] = 'Value1';
const tagQuery = 'key0=Value0&key1=Value1';
const invalidSizeTagSet = genPutTagObj(maxTagSize + 1);
const invalidDuplicateTagSet = genPutTagObj(maxTagSize, true);
const invalidKeyTagSet = [{ Key: Buffer.alloc(129, 'a'), Value: 'value' }];
const invalidValueTagSet = [{ Key: 'key', Value: Buffer.alloc(257, 'a') }];
const onlyMetadata = {
metadata1: 'metadatavalue1',
metadata2: 'metadatavalue2',
};
const tagMetadata = Object.assign({}, validTagObj, onlyMetadata);
const oldTagMetadata = {};
oldTagMetadata[`${gcpTaggingPrefix}Old`] = 'OldValue0';
const withPriorTags = Object.assign({}, onlyMetadata, oldTagMetadata);
describe('GcpUtils Tagging Helper Functions:', () => {
describe('processTagSet', () => {
const tests = [
{
it: 'should return tag object as metadata for valid tag set',
input: validTagSet,
output: validTagObj,
},
{
it: 'should return error for invalid tag set size',
input: invalidSizeTagSet,
output: errors.BadRequest.customizeDescription(
'Object tags cannot be greater than 10'),
},
{
it: 'should return error for duplicate tag keys',
input: invalidDuplicateTagSet,
output: errors.InvalidTag.customizeDescription(
'Cannot provide multiple Tags with the same key'),
},
{
it: 'should return error for invalid "key" value',
input: invalidKeyTagSet,
output: errors.InvalidTag.customizeDescription(
'The TagKey provided is too long, 129'),
},
{
it: 'should return error for invalid "value" value',
input: invalidValueTagSet,
output: errors.InvalidTag.customizeDescription(
'The TagValue provided is too long, 257'),
},
{
it: 'should return empty tag object when input is undefined',
input: undefined,
output: {},
},
];
tests.forEach(test => {
it(test.it, () => {
assert.deepStrictEqual(processTagSet(test.input), test.output);
});
});
});
describe('stripTags', () => {
const tests = [
{
it: 'should return metadata without tag',
input: tagMetadata,
output: onlyMetadata,
},
{
it: 'should return empty object if metadata only has tags',
input: validTagObj,
output: {},
},
{
it: 'should return empty object if input is undefined',
input: undefined,
output: {},
},
];
tests.forEach(test => {
it(test.it, () => {
assert.deepStrictEqual(stripTags(test.input), test.output);
});
});
});
describe('retrieveTags', () => {
const tests = [
{
it: 'should return tagSet from given input metadata',
input: tagMetadata,
output: validTagSet,
},
{
it: 'should return empty when metadata does not have tags',
input: onlyMetadata,
output: [],
},
{
it: 'should return empty if input is undefined',
input: undefined,
output: [],
},
];
tests.forEach(test => {
it(test.it, () => {
assert.deepStrictEqual(retrieveTags(test.input), test.output);
});
});
});
describe('getPutTagsMetadata', () => {
const tests = [
{
it: 'should return correct object when' +
' given a tag query string and a metadata obj',
input: { metadata: Object.assign({}, onlyMetadata), tagQuery },
output: tagMetadata,
},
{
it: 'should return correct object when given only query string',
input: { tagQuery },
output: validTagObj,
},
{
it: 'should return correct object when only metadata is given',
input: { metadata: onlyMetadata },
output: onlyMetadata,
},
{
it: 'should return metadata with correct tag properties ' +
'if given a metdata with prior tags and query string',
input: { metadata: Object.assign({}, withPriorTags), tagQuery },
output: tagMetadata,
},
];
tests.forEach(test => {
it(test.it, () => {
const { metadata, tagQuery } = test.input;
assert.deepStrictEqual(
getPutTagsMetadata(metadata, tagQuery), test.output);
});
});
});
});

View File

@ -1,63 +0,0 @@
const assert = require('assert');
const { checkExternalBackend } = require('../../../lib/data/external/utils');
const awsLocations = [
'awsbackend',
];
const statusSuccess = {
versioningStatus: 'Enabled',
message: 'Congrats! You own the bucket',
};
const statusFailure = {
versioningStatus: 'Suspended',
error: 'Versioning must be enabled',
external: true,
};
const externalBackendHealthCheckInterval = 10000;
function getClients(isSuccess) {
const status = isSuccess ? statusSuccess : statusFailure;
return {
awsbackend: {
healthcheck: (location, cb) => cb(null, { awsbackend: status }),
},
};
}
describe('Testing _checkExternalBackend', function describeF() {
this.timeout(50000);
beforeEach(done => {
const clients = getClients(true);
return checkExternalBackend(clients, awsLocations, 'aws_s3', false,
externalBackendHealthCheckInterval, done);
});
it('should not refresh response before externalBackendHealthCheckInterval',
done => {
const clients = getClients(false);
return checkExternalBackend(clients, awsLocations, 'aws_s3',
false, externalBackendHealthCheckInterval, (err, res) => {
if (err) {
return done(err);
}
assert.strictEqual(res[0].awsbackend, statusSuccess);
return done();
});
});
it('should refresh response after externalBackendHealthCheckInterval',
done => {
const clients = getClients(false);
setTimeout(() => {
checkExternalBackend(clients, awsLocations, 'aws_s3',
false, externalBackendHealthCheckInterval, (err, res) => {
if (err) {
return done(err);
}
assert.strictEqual(res[0].awsbackend, statusFailure);
return done();
});
}, externalBackendHealthCheckInterval + 1);
});
});

View File

@ -1,55 +0,0 @@
const assert = require('assert');
const proxyCompareUrl = require('../../../lib/data/proxyCompareUrl');
const testCases = [
{
endpoint: 'test.scality.com',
noProxy: '',
expRes: false,
desc: 'no NO_PROXY env var set',
},
{
endpoint: 'test.scality.com',
noProxy: 'test.*.com',
expRes: true,
desc: 'NO_PROXY matches with middle wildcard',
},
{
endpoint: 'test.scality.com',
noProxy: '*.com',
expRes: true,
desc: 'NO_PROXY matches with beginning wildcard',
},
{
endpoint: 'test.scality.com',
noProxy: '.scality.com',
expRes: true,
desc: 'NO_PROXY matches with beginning period',
},
{
endpoint: 'test.scality.com',
noProxy: 'test.nomatch,test.scality.*',
expRes: true,
desc: 'match with wildcard',
},
{
endpoint: 'test.scality.com',
noProxy: 'test.nomatch,no.scality.no,no.*.com,scality.com',
expRes: false,
desc: 'no match',
},
];
describe('proxyCompareURL util function', () => {
testCases.forEach(test => {
it(`should return ${test.expRes} if ${test.desc}`, () => {
process.env.NO_PROXY = test.noProxy;
const proxyMatch = proxyCompareUrl(test.endpoint);
assert.strictEqual(test.expRes, proxyMatch);
});
});
after(() => {
process.env.NO_PROXY = '';
});
});