Compare commits
28 Commits
developmen
...
S3C-8896-O
Author | SHA1 | Date |
---|---|---|
Anurag Mittal | 16973fcbd9 | |
Anurag Mittal | 368283c1f2 | |
Anurag Mittal | 4e6a2eac6f | |
Anurag Mittal | e157222ea7 | |
Anurag Mittal | 716c982b36 | |
Anurag Mittal | 785b893305 | |
Anurag Mittal | f943a5ffa7 | |
Anurag Mittal | 7466d488ea | |
Anurag Mittal | d311934fe7 | |
Anurag Mittal | 308b2176da | |
Anurag Mittal | 4e2c8a01cb | |
Anurag Mittal | 51880c4cf4 | |
Anurag Mittal | 9e6394d25a | |
Anurag Mittal | 2e7ae3a84b | |
Anurag Mittal | ec67fd9fdd | |
Anurag Mittal | 396665daa8 | |
Anurag Mittal | 91d45c2fdf | |
Anurag Mittal | b962782e9f | |
Anurag Mittal | 9fcdd567c8 | |
Anurag Mittal | e02e70a33e | |
Anurag Mittal | 8af82991cd | |
Anurag Mittal | 1b5afd9ce7 | |
Anurag Mittal | 4f677a1797 | |
Anurag Mittal | a3d54d4b61 | |
Anurag Mittal | 785fbb3ee4 | |
Anurag Mittal | 7d8529de75 | |
Anurag Mittal | cddfb8d84b | |
Anurag Mittal | 4ba88274c4 |
|
@ -0,0 +1,28 @@
|
||||||
|
const { ParentBasedSampler, TraceIdRatioBasedSampler, SamplingDecision } = require('@opentelemetry/sdk-trace-base');
|
||||||
|
|
||||||
|
class HealthcheckExcludingSampler {
|
||||||
|
constructor(sampler, excludeHealthcheck) {
|
||||||
|
this._sampler = sampler;
|
||||||
|
this._excludeHealthcheck = excludeHealthcheck;
|
||||||
|
}
|
||||||
|
|
||||||
|
shouldSample(context, traceId, spanName, spanKind, attributes, links) {
|
||||||
|
const url = attributes['http.url'] || '';
|
||||||
|
if (this._excludeHealthcheck && (url.includes('healthcheck') || url.includes('metrics'))) {
|
||||||
|
return { decision: SamplingDecision.NOT_RECORD };
|
||||||
|
}
|
||||||
|
return this._sampler.shouldSample(context, traceId, spanName, spanKind, attributes, links);
|
||||||
|
}
|
||||||
|
|
||||||
|
toString() {
|
||||||
|
return `HealthcheckExcludingSampler{${this._sampler.toString()}}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function createSampler(samplingRatio, excludeHealthcheck) {
|
||||||
|
return new ParentBasedSampler({
|
||||||
|
root: new HealthcheckExcludingSampler(new TraceIdRatioBasedSampler(samplingRatio), excludeHealthcheck),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = { createSampler };
|
|
@ -0,0 +1,69 @@
|
||||||
|
const opentelemetry = require('@opentelemetry/sdk-node');
|
||||||
|
const { Resource } = require('@opentelemetry/resources');
|
||||||
|
const { PeriodicExportingMetricReader } = require('@opentelemetry/sdk-metrics');
|
||||||
|
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
|
||||||
|
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-proto');
|
||||||
|
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-proto');
|
||||||
|
const { IORedisInstrumentation } = require('@opentelemetry/instrumentation-ioredis');
|
||||||
|
const { createSampler } = require('./customSampler');
|
||||||
|
|
||||||
|
const {
|
||||||
|
SEMRESATTRS_SERVICE_NAME,
|
||||||
|
SEMRESATTRS_SERVICE_VERSION,
|
||||||
|
} = require('@opentelemetry/semantic-conventions');
|
||||||
|
|
||||||
|
// Define resource with service name and version
|
||||||
|
const resource = new Resource({
|
||||||
|
[SEMRESATTRS_SERVICE_NAME]: process.env.OTEL_SERVICE_NAME || 'cisco-s3-cloudserver',
|
||||||
|
[SEMRESATTRS_SERVICE_VERSION]: '7.70.47',
|
||||||
|
});
|
||||||
|
|
||||||
|
const collectorHost = process.env.OTEL_COLLECTOR_HOST || 'localhost';
|
||||||
|
const collectorPort = process.env.OTEL_COLLECTOR_PORT || 4318;
|
||||||
|
const samplingRatio = parseFloat(process.env.OTEL_SAMPLING_RATIO) || 0.05;
|
||||||
|
const excludeHealthcheck = process.env.OTEL_EXCLUDE_HEALTHCHECK || 'true';
|
||||||
|
|
||||||
|
// OTLP Trace Exporter configuration
|
||||||
|
const traceExporter = new OTLPTraceExporter({
|
||||||
|
url: `http://${collectorHost}:${collectorPort}/v1/traces`,
|
||||||
|
headers: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
// Metric Reader configuration
|
||||||
|
const metricReader = new PeriodicExportingMetricReader({
|
||||||
|
exporter: new OTLPMetricExporter({
|
||||||
|
url: `http://${collectorHost}:${collectorPort}/v1/metrics`,
|
||||||
|
headers: {},
|
||||||
|
concurrencyLimit: 1,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
|
||||||
|
// Node SDK configuration
|
||||||
|
const sdk = new opentelemetry.NodeSDK({
|
||||||
|
traceExporter,
|
||||||
|
resource,
|
||||||
|
metricReader,
|
||||||
|
// sampler: new TraceIdRatioBasedSampler(samplingRatio),
|
||||||
|
sampler: createSampler(samplingRatio, excludeHealthcheck),
|
||||||
|
instrumentations: [
|
||||||
|
getNodeAutoInstrumentations({
|
||||||
|
'@opentelemetry/instrumentation-fs': {
|
||||||
|
enabled: false,
|
||||||
|
},
|
||||||
|
'@opentelemetry/instrumentation-http': {
|
||||||
|
responseHook: (span, operations) => {
|
||||||
|
span.updateName(
|
||||||
|
`${operations.req.protocol} ${operations.req.method} ${operations.req.path.split('&')[0]}`);
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
new IORedisInstrumentation({
|
||||||
|
requestHook: (span, { cmdName, cmdArgs }) => {
|
||||||
|
span.updateName(`Redis:: ${cmdName.toUpperCase()} cache operation for ${cmdArgs[0].split(':')[0]}`);
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
],
|
||||||
|
});
|
||||||
|
|
||||||
|
// Start the Node SDK
|
||||||
|
sdk.start();
|
477
lib/api/api.js
477
lib/api/api.js
|
@ -1,5 +1,6 @@
|
||||||
const { auth, errors, policies } = require('arsenal');
|
const { auth, errors, policies } = require('arsenal');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const bucketDelete = require('./bucketDelete');
|
const bucketDelete = require('./bucketDelete');
|
||||||
const bucketDeleteCors = require('./bucketDeleteCors');
|
const bucketDeleteCors = require('./bucketDeleteCors');
|
||||||
|
@ -75,215 +76,297 @@ auth.setHandler(vault);
|
||||||
|
|
||||||
/* eslint-disable no-param-reassign */
|
/* eslint-disable no-param-reassign */
|
||||||
const api = {
|
const api = {
|
||||||
callApiMethod(apiMethod, request, response, log, callback) {
|
callApiMethod(apiMethod, request, response, log, callback, oTel) {
|
||||||
// Attach the apiMethod method to the request, so it can used by monitoring in the server
|
const action = monitoringMap[apiMethod];
|
||||||
// eslint-disable-next-line no-param-reassign
|
const {
|
||||||
request.apiMethod = apiMethod;
|
cloudserverApiSpan,
|
||||||
|
activeSpan,
|
||||||
const actionLog = monitoringMap[apiMethod];
|
activeTracerContext,
|
||||||
if (!actionLog &&
|
tracer,
|
||||||
apiMethod !== 'websiteGet' &&
|
} = oTel;
|
||||||
apiMethod !== 'websiteHead' &&
|
activeSpan.addEvent(`Initiating Cloudserver API Method ${apiMethod}`);
|
||||||
apiMethod !== 'corsPreflight') {
|
// cloudserverApiSpan.updateName(`Performing operations to satisfy ${action} API request. Cloudserver API method: ${apiMethod}()`);
|
||||||
log.error('callApiMethod(): No actionLog for this api method', {
|
cloudserverApiSpan.updateName(`Executing ${action} API Method`);
|
||||||
apiMethod,
|
// TODO: Try out activeTracerContext instead of opentelemetry.context.active()
|
||||||
|
const cloudserverApiSpanContext = opentelemetry.trace.setSpan(
|
||||||
|
activeTracerContext,
|
||||||
|
cloudserverApiSpan,
|
||||||
|
);
|
||||||
|
return tracer.startActiveSpan(' Setting Up Request Headers and Context for Authentication and Authorization', undefined, cloudserverApiSpanContext, callApiMethod => {
|
||||||
|
callApiMethod.setAttributes({
|
||||||
|
'code.function': 'callApiMethod()',
|
||||||
|
'code.filename': 'lib/api/api.js',
|
||||||
|
'code.lineno': 79,
|
||||||
|
'rpc.method': action,
|
||||||
});
|
});
|
||||||
}
|
// Attach the apiMethod method to the request, so it can used by monitoring in the server
|
||||||
log.addDefaultFields({
|
// eslint-disable-next-line no-param-reassign
|
||||||
service: 's3',
|
request.apiMethod = apiMethod;
|
||||||
action: actionLog,
|
const actionLog = monitoringMap[apiMethod];
|
||||||
bucketName: request.bucketName,
|
if (!actionLog &&
|
||||||
});
|
apiMethod !== 'websiteGet' &&
|
||||||
if (request.objectKey) {
|
apiMethod !== 'websiteHead' &&
|
||||||
|
apiMethod !== 'corsPreflight') {
|
||||||
|
log.error('callApiMethod(): No actionLog for this api method', {
|
||||||
|
apiMethod,
|
||||||
|
});
|
||||||
|
}
|
||||||
log.addDefaultFields({
|
log.addDefaultFields({
|
||||||
objectKey: request.objectKey,
|
service: 's3',
|
||||||
|
action: actionLog,
|
||||||
|
bucketName: request.bucketName,
|
||||||
});
|
});
|
||||||
}
|
if (request.objectKey) {
|
||||||
let returnTagCount = true;
|
log.addDefaultFields({
|
||||||
|
objectKey: request.objectKey,
|
||||||
const validationRes = validateQueryAndHeaders(request, log);
|
});
|
||||||
if (validationRes.error) {
|
}
|
||||||
log.debug('request query / header validation failed', {
|
|
||||||
error: validationRes.error,
|
|
||||||
method: 'api.callApiMethod',
|
|
||||||
});
|
|
||||||
return process.nextTick(callback, validationRes.error);
|
|
||||||
}
|
|
||||||
|
|
||||||
// no need to check auth on website or cors preflight requests
|
|
||||||
if (apiMethod === 'websiteGet' || apiMethod === 'websiteHead' ||
|
|
||||||
apiMethod === 'corsPreflight') {
|
|
||||||
request.actionImplicitDenies = false;
|
|
||||||
return this[apiMethod](request, log, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
const { sourceBucket, sourceObject, sourceVersionId, parsingError } =
|
|
||||||
parseCopySource(apiMethod, request.headers['x-amz-copy-source']);
|
|
||||||
if (parsingError) {
|
|
||||||
log.debug('error parsing copy source', {
|
|
||||||
error: parsingError,
|
|
||||||
});
|
|
||||||
return process.nextTick(callback, parsingError);
|
|
||||||
}
|
|
||||||
|
|
||||||
const { httpHeadersSizeError } = checkHttpHeadersSize(request.headers);
|
|
||||||
if (httpHeadersSizeError) {
|
|
||||||
log.debug('http header size limit exceeded', {
|
|
||||||
error: httpHeadersSizeError,
|
|
||||||
});
|
|
||||||
return process.nextTick(callback, httpHeadersSizeError);
|
|
||||||
}
|
|
||||||
|
|
||||||
const requestContexts = prepareRequestContexts(apiMethod, request,
|
|
||||||
sourceBucket, sourceObject, sourceVersionId);
|
|
||||||
// Extract all the _apiMethods and store them in an array
|
|
||||||
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
|
|
||||||
// Attach the names to the current request
|
|
||||||
// eslint-disable-next-line no-param-reassign
|
|
||||||
request.apiMethods = apiMethods;
|
|
||||||
|
|
||||||
function checkAuthResults(authResults) {
|
|
||||||
let returnTagCount = true;
|
let returnTagCount = true;
|
||||||
const isImplicitDeny = {};
|
|
||||||
let isOnlyImplicitDeny = true;
|
const validationRes = validateQueryAndHeaders(request, log);
|
||||||
if (apiMethod === 'objectGet') {
|
if (validationRes.error) {
|
||||||
// first item checks s3:GetObject(Version) action
|
activeSpan.recordException(validationRes.error);
|
||||||
if (!authResults[0].isAllowed && !authResults[0].isImplicit) {
|
callApiMethod.end();
|
||||||
log.trace('get object authorization denial from Vault');
|
log.debug('request query / header validation failed', {
|
||||||
return errors.AccessDenied;
|
error: validationRes.error,
|
||||||
}
|
method: 'api.callApiMethod',
|
||||||
// TODO add support for returnTagCount in the bucket policy
|
});
|
||||||
// checks
|
return process.nextTick(callback, validationRes.error);
|
||||||
isImplicitDeny[authResults[0].action] = authResults[0].isImplicit;
|
}
|
||||||
// second item checks s3:GetObject(Version)Tagging action
|
|
||||||
if (!authResults[1].isAllowed) {
|
// no need to check auth on website or cors preflight requests
|
||||||
log.trace('get tagging authorization denial ' +
|
if (apiMethod === 'websiteGet' || apiMethod === 'websiteHead' ||
|
||||||
'from Vault');
|
apiMethod === 'corsPreflight') {
|
||||||
returnTagCount = false;
|
callApiMethod.end();
|
||||||
}
|
activeSpan.addEvent(`Forwarding Request to ${apiMethod} Handler`);
|
||||||
} else {
|
request.actionImplicitDenies = false;
|
||||||
for (let i = 0; i < authResults.length; i++) {
|
return this[apiMethod](request, log, callback);
|
||||||
isImplicitDeny[authResults[i].action] = true;
|
}
|
||||||
if (!authResults[i].isAllowed && !authResults[i].isImplicit) {
|
|
||||||
// Any explicit deny rejects the current API call
|
const { sourceBucket, sourceObject, sourceVersionId, parsingError } =
|
||||||
log.trace('authorization denial from Vault');
|
parseCopySource(apiMethod, request.headers['x-amz-copy-source']);
|
||||||
|
if (parsingError) {
|
||||||
|
activeSpan.recordException(parsingError);
|
||||||
|
callApiMethod.end();
|
||||||
|
log.debug('error parsing copy source', {
|
||||||
|
error: parsingError,
|
||||||
|
});
|
||||||
|
return process.nextTick(callback, parsingError);
|
||||||
|
}
|
||||||
|
|
||||||
|
const { httpHeadersSizeError } = checkHttpHeadersSize(request.headers);
|
||||||
|
if (httpHeadersSizeError) {
|
||||||
|
activeSpan.recordException(httpHeadersSizeError);
|
||||||
|
callApiMethod.end();
|
||||||
|
log.debug('http header size limit exceeded', {
|
||||||
|
error: httpHeadersSizeError,
|
||||||
|
});
|
||||||
|
return process.nextTick(callback, httpHeadersSizeError);
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('Initializing Authentication and Authorization Request Contexts');
|
||||||
|
const requestContexts = prepareRequestContexts(apiMethod, request,
|
||||||
|
sourceBucket, sourceObject, sourceVersionId);
|
||||||
|
activeSpan.addEvent('Authentication and Authorization Request Contexts Initialized');
|
||||||
|
// Extract all the _apiMethods and store them in an array
|
||||||
|
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
|
||||||
|
// Attach the names to the current request
|
||||||
|
// eslint-disable-next-line no-param-reassign
|
||||||
|
request.apiMethods = apiMethods;
|
||||||
|
|
||||||
|
function checkAuthResults(authResults) {
|
||||||
|
let returnTagCount = true;
|
||||||
|
const isImplicitDeny = {};
|
||||||
|
let isOnlyImplicitDeny = true;
|
||||||
|
if (apiMethod === 'objectGet') {
|
||||||
|
// first item checks s3:GetObject(Version) action
|
||||||
|
if (!authResults[0].isAllowed && !authResults[0].isImplicit) {
|
||||||
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
|
log.trace('get object authorization denial from Vault');
|
||||||
return errors.AccessDenied;
|
return errors.AccessDenied;
|
||||||
}
|
}
|
||||||
if (authResults[i].isAllowed) {
|
// TODO add support for returnTagCount in the bucket policy
|
||||||
// If the action is allowed, the result is not implicit
|
// checks
|
||||||
// Deny.
|
isImplicitDeny[authResults[0].action] = authResults[0].isImplicit;
|
||||||
isImplicitDeny[authResults[i].action] = false;
|
// second item checks s3:GetObject(Version)Tagging action
|
||||||
isOnlyImplicitDeny = false;
|
if (!authResults[1].isAllowed) {
|
||||||
|
log.trace('get tagging authorization denial ' +
|
||||||
|
'from Vault');
|
||||||
|
returnTagCount = false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (let i = 0; i < authResults.length; i++) {
|
||||||
|
isImplicitDeny[authResults[i].action] = true;
|
||||||
|
if (!authResults[i].isAllowed && !authResults[i].isImplicit) {
|
||||||
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
|
// Any explicit deny rejects the current API call
|
||||||
|
log.trace('authorization denial from Vault');
|
||||||
|
return errors.AccessDenied;
|
||||||
|
}
|
||||||
|
if (authResults[i].isAllowed) {
|
||||||
|
// If the action is allowed, the result is not implicit
|
||||||
|
// Deny.
|
||||||
|
isImplicitDeny[authResults[i].action] = false;
|
||||||
|
isOnlyImplicitDeny = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// These two APIs cannot use ACLs or Bucket Policies, hence, any
|
||||||
|
// implicit deny from vault must be treated as an explicit deny.
|
||||||
|
if ((apiMethod === 'bucketPut' || apiMethod === 'serviceGet') && isOnlyImplicitDeny) {
|
||||||
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
|
return errors.AccessDenied;
|
||||||
|
}
|
||||||
|
return { returnTagCount, isImplicitDeny };
|
||||||
}
|
}
|
||||||
// These two APIs cannot use ACLs or Bucket Policies, hence, any
|
callApiMethod.end();
|
||||||
// implicit deny from vault must be treated as an explicit deny.
|
activeSpan.addEvent('Starting User Authentication and Authorization');
|
||||||
if ((apiMethod === 'bucketPut' || apiMethod === 'serviceGet') && isOnlyImplicitDeny) {
|
return async.waterfall([
|
||||||
return errors.AccessDenied;
|
next => tracer.startActiveSpan('Authenticating and Authorizing the Request with Vault', undefined, cloudserverApiSpanContext, iamAuthSpan => {
|
||||||
}
|
const iamAuthSpanContext = opentelemetry.trace.setSpan(
|
||||||
return { returnTagCount, isImplicitDeny };
|
cloudserverApiSpanContext,
|
||||||
}
|
iamAuthSpan,
|
||||||
|
);
|
||||||
return async.waterfall([
|
activeSpan.addEvent('Starting Vault Authentication and Authorization');
|
||||||
next => auth.server.doAuth(
|
iamAuthSpan.setAttributes({
|
||||||
request, log, (err, userInfo, authorizationResults, streamingV4Params) => {
|
'code.function': 'callApiMethod()',
|
||||||
if (err) {
|
'code.filename': 'lib/api/api.js',
|
||||||
log.trace('authentication error', { error: err });
|
'code.lineno': 220,
|
||||||
return next(err);
|
'rpc.method': 'AuthV4',
|
||||||
}
|
});
|
||||||
return next(null, userInfo, authorizationResults, streamingV4Params);
|
return auth.server.doAuth(
|
||||||
}, 's3', requestContexts),
|
request, log, (err, userInfo, authorizationResults, streamingV4Params) => {
|
||||||
(userInfo, authorizationResults, streamingV4Params, next) => {
|
activeSpan.addEvent('Received Response from Vault');
|
||||||
const authNames = { accountName: userInfo.getAccountDisplayName() };
|
iamAuthSpan.end();
|
||||||
if (userInfo.isRequesterAnIAMUser()) {
|
if (err) {
|
||||||
authNames.userName = userInfo.getIAMdisplayName();
|
activeSpan.recordException(err);
|
||||||
}
|
log.trace('authentication error', { error: err });
|
||||||
log.addDefaultFields(authNames);
|
return next(err);
|
||||||
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
|
}
|
||||||
return next(null, userInfo, authorizationResults, streamingV4Params);
|
return next(null, userInfo, authorizationResults, streamingV4Params);
|
||||||
}
|
}, 's3', requestContexts, { activeSpan, activeTracerContext: iamAuthSpanContext, tracer });
|
||||||
// issue 100 Continue to the client
|
}),
|
||||||
writeContinue(request, response);
|
(userInfo, authorizationResults, streamingV4Params, next) => tracer.startActiveSpan('Process Vault Response', undefined, cloudserverApiSpanContext, currentSpan => {
|
||||||
const MAX_POST_LENGTH = request.method === 'POST' ?
|
currentSpan.setAttributes({
|
||||||
1024 * 1024 : 1024 * 1024 / 2; // 1 MB or 512 KB
|
'code.function': 'callApiMethod()',
|
||||||
const post = [];
|
'code.filename': 'lib/api/api.js',
|
||||||
let postLength = 0;
|
'code.lineno': 220,
|
||||||
request.on('data', chunk => {
|
});
|
||||||
postLength += chunk.length;
|
activeSpan.addEvent('Processing Vault Response');
|
||||||
// Sanity check on post length
|
const authNames = { accountName: userInfo.getAccountDisplayName() };
|
||||||
if (postLength <= MAX_POST_LENGTH) {
|
if (userInfo.isRequesterAnIAMUser()) {
|
||||||
post.push(chunk);
|
authNames.userName = userInfo.getIAMdisplayName();
|
||||||
}
|
}
|
||||||
});
|
log.addDefaultFields(authNames);
|
||||||
|
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
|
||||||
request.on('error', err => {
|
currentSpan.end();
|
||||||
log.trace('error receiving request', {
|
return next(null, userInfo, authorizationResults, streamingV4Params);
|
||||||
error: err,
|
}
|
||||||
|
// issue 100 Continue to the client
|
||||||
|
writeContinue(request, response);
|
||||||
|
const MAX_POST_LENGTH = request.method === 'POST' ?
|
||||||
|
1024 * 1024 : 1024 * 1024 / 2; // 1 MB or 512 KB
|
||||||
|
const post = [];
|
||||||
|
let postLength = 0;
|
||||||
|
request.on('data', chunk => {
|
||||||
|
postLength += chunk.length;
|
||||||
|
// Sanity check on post length
|
||||||
|
if (postLength <= MAX_POST_LENGTH) {
|
||||||
|
post.push(chunk);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
return next(errors.InternalError);
|
|
||||||
});
|
|
||||||
|
|
||||||
request.on('end', () => {
|
request.on('error', err => {
|
||||||
if (postLength > MAX_POST_LENGTH) {
|
activeSpan.recordException(err);
|
||||||
log.error('body length is too long for request type',
|
currentSpan.end();
|
||||||
{ postLength });
|
log.trace('error receiving request', {
|
||||||
return next(errors.InvalidRequest);
|
error: err,
|
||||||
}
|
});
|
||||||
// Convert array of post buffers into one string
|
return next(errors.InternalError);
|
||||||
request.post = Buffer.concat(post, postLength).toString();
|
});
|
||||||
return next(null, userInfo, authorizationResults, streamingV4Params);
|
|
||||||
});
|
request.on('end', () => {
|
||||||
return undefined;
|
if (postLength > MAX_POST_LENGTH) {
|
||||||
},
|
activeSpan.recordException(errors.InvalidRequest);
|
||||||
// Tag condition keys require information from CloudServer for evaluation
|
currentSpan.end();
|
||||||
(userInfo, authorizationResults, streamingV4Params, next) => tagConditionKeyAuth(
|
log.error('body length is too long for request type',
|
||||||
authorizationResults,
|
{ postLength });
|
||||||
request,
|
return next(errors.InvalidRequest);
|
||||||
requestContexts,
|
}
|
||||||
apiMethod,
|
// Convert array of post buffers into one string
|
||||||
log,
|
request.post = Buffer.concat(post, postLength).toString();
|
||||||
(err, authResultsWithTags) => {
|
activeSpan.addEvent('Vault Response Processed');
|
||||||
if (err) {
|
return next(null, userInfo, authorizationResults, streamingV4Params);
|
||||||
log.trace('tag authentication error', { error: err });
|
});
|
||||||
return next(err);
|
return undefined;
|
||||||
}
|
}),
|
||||||
return next(null, userInfo, authResultsWithTags, streamingV4Params);
|
// Tag condition keys require information from CloudServer for evaluation
|
||||||
},
|
(userInfo, authorizationResults, streamingV4Params, next) => tracer.startActiveSpan('Validate Tag condition keys for Authorization', undefined, cloudserverApiSpanContext, currentSpan => {
|
||||||
),
|
activeSpan.addEvent('Validating Tag Conditions in Authorization');
|
||||||
], (err, userInfo, authorizationResults, streamingV4Params) => {
|
return tagConditionKeyAuth(
|
||||||
if (err) {
|
authorizationResults,
|
||||||
return callback(err);
|
request,
|
||||||
}
|
requestContexts,
|
||||||
if (authorizationResults) {
|
apiMethod,
|
||||||
const checkedResults = checkAuthResults(authorizationResults);
|
log,
|
||||||
if (checkedResults instanceof Error) {
|
(err, authResultsWithTags) => {
|
||||||
return callback(checkedResults);
|
currentSpan.end();
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
log.trace('tag authentication error', { error: err });
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, userInfo, authResultsWithTags, streamingV4Params);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}),
|
||||||
|
], (err, userInfo, authorizationResults, streamingV4Params) => {
|
||||||
|
activeSpan.addEvent('Vault Authentication and Authorization Completed');
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
return callback(err);
|
||||||
}
|
}
|
||||||
returnTagCount = checkedResults.returnTagCount;
|
if (authorizationResults) {
|
||||||
request.actionImplicitDenies = checkedResults.isImplicitDeny;
|
activeSpan.addEvent('Verifying Authorization Results');
|
||||||
} else {
|
const checkedResults = checkAuthResults(authorizationResults);
|
||||||
// create an object of keys apiMethods with all values to false:
|
if (checkedResults instanceof Error) {
|
||||||
// for backward compatibility, all apiMethods are allowed by default
|
activeSpan.recordException(checkedResults);
|
||||||
// thus it is explicitly allowed, so implicit deny is false
|
return callback(checkedResults);
|
||||||
request.actionImplicitDenies = apiMethods.reduce((acc, curr) => {
|
}
|
||||||
acc[curr] = false;
|
activeSpan.addEvent('Authorization Results Verified');
|
||||||
return acc;
|
returnTagCount = checkedResults.returnTagCount;
|
||||||
}, {});
|
request.actionImplicitDenies = checkedResults.isImplicitDeny;
|
||||||
}
|
} else {
|
||||||
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
|
// create an object of keys apiMethods with all values to false:
|
||||||
request._response = response;
|
// for backward compatibility, all apiMethods are allowed by default
|
||||||
return this[apiMethod](userInfo, request, streamingV4Params,
|
// thus it is explicitly allowed, so implicit deny is false
|
||||||
log, callback, authorizationResults);
|
request.actionImplicitDenies = apiMethods.reduce((acc, curr) => {
|
||||||
}
|
acc[curr] = false;
|
||||||
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') {
|
return acc;
|
||||||
return this[apiMethod](userInfo, request, sourceBucket,
|
}, {});
|
||||||
sourceObject, sourceVersionId, log, callback);
|
}
|
||||||
}
|
cloudserverApiSpan.setAttributes({
|
||||||
if (apiMethod === 'objectGet') {
|
'code.lineno': 95,
|
||||||
return this[apiMethod](userInfo, request, returnTagCount, log, callback);
|
'code.filename': 'lib/api/api.js',
|
||||||
}
|
'code.function': `callApiMethod().${apiMethod}()`,
|
||||||
return this[apiMethod](userInfo, request, log, callback);
|
});
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.apiMethod': apiMethod,
|
||||||
|
});
|
||||||
|
activeSpan.addEvent(`Forwarding Request to ${apiMethod} Handler`);
|
||||||
|
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
|
||||||
|
request._response = response;
|
||||||
|
return this[apiMethod](userInfo, request, streamingV4Params,
|
||||||
|
log, callback, authorizationResults, oTel);
|
||||||
|
}
|
||||||
|
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') {
|
||||||
|
return this[apiMethod](userInfo, request, sourceBucket,
|
||||||
|
sourceObject, sourceVersionId, log, callback, oTel);
|
||||||
|
}
|
||||||
|
if (apiMethod === 'objectGet') {
|
||||||
|
return this[apiMethod](userInfo, request, returnTagCount, log, callback, oTel);
|
||||||
|
}
|
||||||
|
return this[apiMethod](userInfo, request, log, callback, oTel);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
bucketDelete,
|
bucketDelete,
|
||||||
|
|
|
@ -8,7 +8,7 @@ const { standardMetadataValidateBucketAndObj } =
|
||||||
const services = require('../../../services');
|
const services = require('../../../services');
|
||||||
|
|
||||||
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
|
function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
|
||||||
callback, request) {
|
callback, request, oTel) {
|
||||||
const metadataValMPUparams = {
|
const metadataValMPUparams = {
|
||||||
authInfo,
|
authInfo,
|
||||||
bucketName,
|
bucketName,
|
||||||
|
@ -42,7 +42,7 @@ function abortMultipartUpload(authInfo, bucketName, objectKey, uploadId, log,
|
||||||
'bucketPolicyGoAhead';
|
'bucketPolicyGoAhead';
|
||||||
}
|
}
|
||||||
return next(null, destinationBucket);
|
return next(null, destinationBucket);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function checkMPUval(destBucket, next) {
|
function checkMPUval(destBucket, next) {
|
||||||
metadataValParams.log = log;
|
metadataValParams.log = log;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const { errors, s3middleware } = require('arsenal');
|
const { errors, s3middleware } = require('arsenal');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
|
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
|
||||||
|
|
||||||
const constants = require('../../../../constants');
|
const constants = require('../../../../constants');
|
||||||
|
@ -46,20 +47,62 @@ function _checkAndApplyZenkoMD(metaHeaders, request, isDeleteMarker) {
|
||||||
}
|
}
|
||||||
|
|
||||||
function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
||||||
metadataStoreParams, dataToDelete, log, requestMethod, callback) {
|
metadataStoreParams, dataToDelete, log, requestMethod, callback, oTel) {
|
||||||
services.metadataStoreObject(bucketName, dataGetInfo,
|
const { activeSpan, activeTracerContext, tracer } = oTel;
|
||||||
cipherBundle, metadataStoreParams, (err, result) => {
|
return async.waterfall([
|
||||||
if (err) {
|
next => tracer.startActiveSpan('Update Metadata for the stored object', undefined, activeTracerContext, currentSpan => {
|
||||||
return callback(err);
|
currentSpan.setAttributes({
|
||||||
}
|
'code.function': '_storeInMDandDeleteData.storeMetadata()',
|
||||||
|
'code.filename': 'lib/api/apiUtils/object/createAndStoreObject.js',
|
||||||
|
'code.lineno': 62,
|
||||||
|
});
|
||||||
|
return next(null, currentSpan);
|
||||||
|
}),
|
||||||
|
(currentSpan, next) => {
|
||||||
|
const context = opentelemetry.trace.setSpan(
|
||||||
|
activeTracerContext,
|
||||||
|
currentSpan,
|
||||||
|
);
|
||||||
|
services.metadataStoreObject(bucketName, dataGetInfo, cipherBundle, metadataStoreParams, (err, result) => {
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
currentSpan.end();
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, result, currentSpan);
|
||||||
|
}, { activeSpan, activeTracerContext: context, tracer });
|
||||||
|
},
|
||||||
|
(result, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Stored Metadata');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, result);
|
||||||
|
},
|
||||||
|
(result, next) => tracer.startActiveSpan('Batch delete Old Data if Necessary', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': '_storeInMDandDeleteData.deleteOldData()',
|
||||||
|
'code.filename': 'lib/api/apiUtils/object/createAndStoreObject.js',
|
||||||
|
'code.lineno': 80,
|
||||||
|
});
|
||||||
|
return next(null, result, currentSpan);
|
||||||
|
}),
|
||||||
|
(result, currentSpan, next) => {
|
||||||
if (dataToDelete) {
|
if (dataToDelete) {
|
||||||
const newDataStoreName = Array.isArray(dataGetInfo) ?
|
activeSpan.addEvent('Batch delete Old Data');
|
||||||
dataGetInfo[0].dataStoreName : null;
|
const newDataStoreName = Array.isArray(dataGetInfo) ? dataGetInfo[0].dataStoreName : null;
|
||||||
return data.batchDelete(dataToDelete, requestMethod,
|
data.batchDelete(dataToDelete, requestMethod, newDataStoreName, log, err => {
|
||||||
newDataStoreName, log, err => callback(err, result));
|
next(err, result, currentSpan);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
activeSpan.addEvent('No Old Data to Delete');
|
||||||
|
next(null, result, currentSpan);
|
||||||
}
|
}
|
||||||
return callback(null, result);
|
},
|
||||||
});
|
(result, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Deleted Old Data');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, result);
|
||||||
|
},
|
||||||
|
], callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** createAndStoreObject - store data, store metadata, and delete old data
|
/** createAndStoreObject - store data, store metadata, and delete old data
|
||||||
|
@ -78,14 +121,23 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
||||||
* credentialScope (to be used for streaming v4 auth if applicable)
|
* credentialScope (to be used for streaming v4 auth if applicable)
|
||||||
* @param {(object|null)} overheadField - fields to be included in metadata overhead
|
* @param {(object|null)} overheadField - fields to be included in metadata overhead
|
||||||
* @param {RequestLogger} log - logger instance
|
* @param {RequestLogger} log - logger instance
|
||||||
* @param {function} callback - callback function
|
* @param {function} callback - callback function]
|
||||||
|
* @param {object} oTel - OpenTelemetry methods
|
||||||
* @return {undefined} and call callback with (err, result) -
|
* @return {undefined} and call callback with (err, result) -
|
||||||
* result.contentMD5 - content md5 of new object or version
|
* result.contentMD5 - content md5 of new object or version
|
||||||
* result.versionId - unencrypted versionId returned by metadata
|
* result.versionId - unencrypted versionId returned by metadata
|
||||||
*/
|
*/
|
||||||
function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
canonicalID, cipherBundle, request, isDeleteMarker, streamingV4Params,
|
canonicalID, cipherBundle, request, isDeleteMarker, streamingV4Params,
|
||||||
overheadField, log, callback) {
|
overheadField, log, callback, oTel) {
|
||||||
|
const { activeSpan, activeTracerContext, tracer } = oTel;
|
||||||
|
const objectStorageParamSpan = tracer.startSpan('Validating Storage Request Parameters', undefined, activeTracerContext);
|
||||||
|
objectStorageParamSpan.setAttributes({
|
||||||
|
'code.function': 'createAndStoreObject()',
|
||||||
|
'code.filename': 'lib/api/apiUtils/object/createAndStoreObject.js',
|
||||||
|
'code.lineno': 87,
|
||||||
|
});
|
||||||
|
activeSpan.addEvent('Entered createAndStoreObject()');
|
||||||
const size = isDeleteMarker ? 0 : request.parsedContentLength;
|
const size = isDeleteMarker ? 0 : request.parsedContentLength;
|
||||||
// although the request method may actually be 'DELETE' if creating a
|
// although the request method may actually be 'DELETE' if creating a
|
||||||
// delete marker, for our purposes we consider this to be a 'PUT'
|
// delete marker, for our purposes we consider this to be a 'PUT'
|
||||||
|
@ -97,8 +149,11 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
const err = errors.InvalidRedirectLocation;
|
const err = errors.InvalidRedirectLocation;
|
||||||
log.debug('invalid x-amz-website-redirect-location' +
|
log.debug('invalid x-amz-website-redirect-location' +
|
||||||
`value ${websiteRedirectHeader}`, { error: err });
|
`value ${websiteRedirectHeader}`, { error: err });
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
objectStorageParamSpan.end();
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Validated x-amz-website-redirect-location Header');
|
||||||
|
|
||||||
const metaHeaders = isDeleteMarker ? [] : getMetaHeaders(request.headers);
|
const metaHeaders = isDeleteMarker ? [] : getMetaHeaders(request.headers);
|
||||||
if (metaHeaders instanceof Error) {
|
if (metaHeaders instanceof Error) {
|
||||||
|
@ -106,11 +161,15 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
error: metaHeaders,
|
error: metaHeaders,
|
||||||
method: 'createAndStoreObject',
|
method: 'createAndStoreObject',
|
||||||
});
|
});
|
||||||
|
activeSpan.recordException(metaHeaders);
|
||||||
|
objectStorageParamSpan.end();
|
||||||
return process.nextTick(() => callback(metaHeaders));
|
return process.nextTick(() => callback(metaHeaders));
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Extracted User Provided Metadata Headers');
|
||||||
// if receiving a request from Zenko for a delete marker, we place a
|
// if receiving a request from Zenko for a delete marker, we place a
|
||||||
// user-metadata field on the object
|
// user-metadata field on the object
|
||||||
_checkAndApplyZenkoMD(metaHeaders, request, isDeleteMarker);
|
_checkAndApplyZenkoMD(metaHeaders, request, isDeleteMarker);
|
||||||
|
activeSpan.addEvent('Applied Zenko Metadata If Request From Zenko');
|
||||||
|
|
||||||
log.trace('meta headers', { metaHeaders, method: 'objectPut' });
|
log.trace('meta headers', { metaHeaders, method: 'objectPut' });
|
||||||
const objectKeyContext = {
|
const objectKeyContext = {
|
||||||
|
@ -140,6 +199,7 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
overheadField,
|
overheadField,
|
||||||
log,
|
log,
|
||||||
};
|
};
|
||||||
|
activeSpan.addEvent('Set Metadata Store Parameters And Set Object Key Context');
|
||||||
|
|
||||||
if (!isDeleteMarker) {
|
if (!isDeleteMarker) {
|
||||||
metadataStoreParams.contentType = request.headers['content-type'];
|
metadataStoreParams.contentType = request.headers['content-type'];
|
||||||
|
@ -157,6 +217,7 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
metadataStoreParams.defaultRetention
|
metadataStoreParams.defaultRetention
|
||||||
= defaultObjectLockConfiguration;
|
= defaultObjectLockConfiguration;
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('DeleteMarker Not True, Set Metadata Store Request Headers');
|
||||||
}
|
}
|
||||||
|
|
||||||
// if creating new delete marker and there is an existing object, copy
|
// if creating new delete marker and there is an existing object, copy
|
||||||
|
@ -166,21 +227,27 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
request.headers[constants.objectLocationConstraintHeader] =
|
request.headers[constants.objectLocationConstraintHeader] =
|
||||||
objMD[constants.objectLocationConstraintHeader];
|
objMD[constants.objectLocationConstraintHeader];
|
||||||
metadataStoreParams.originOp = 's3:ObjectRemoved:DeleteMarkerCreated';
|
metadataStoreParams.originOp = 's3:ObjectRemoved:DeleteMarkerCreated';
|
||||||
|
activeSpan.addEvent('DeleteMarker True And Object Metadata Preset, Set Metadata Store Request Headers');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const backendInfoObj =
|
const backendInfoObj =
|
||||||
locationConstraintCheck(request, null, bucketMD, log);
|
locationConstraintCheck(request, null, bucketMD, log);
|
||||||
if (backendInfoObj.err) {
|
if (backendInfoObj.err) {
|
||||||
return process.nextTick(() => {
|
return process.nextTick(() => {
|
||||||
|
activeSpan.recordException(backendInfoObj.err);
|
||||||
|
objectStorageParamSpan.end();
|
||||||
callback(backendInfoObj.err);
|
callback(backendInfoObj.err);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Checked Location Constraint');
|
||||||
|
|
||||||
const backendInfo = backendInfoObj.backendInfo;
|
const backendInfo = backendInfoObj.backendInfo;
|
||||||
const location = backendInfo.getControllingLocationConstraint();
|
const location = backendInfo.getControllingLocationConstraint();
|
||||||
const locationType = backendInfoObj.defaultedToDataBackend ? location :
|
const locationType = backendInfoObj.defaultedToDataBackend ? location :
|
||||||
config.getLocationConstraintType(location);
|
config.getLocationConstraintType(location);
|
||||||
metadataStoreParams.dataStoreName = location;
|
metadataStoreParams.dataStoreName = location;
|
||||||
|
activeSpan.addEvent('Set Backend Info and Location');
|
||||||
|
|
||||||
if (versioningNotImplBackends[locationType]) {
|
if (versioningNotImplBackends[locationType]) {
|
||||||
const vcfg = bucketMD.getVersioningConfiguration();
|
const vcfg = bucketMD.getVersioningConfiguration();
|
||||||
|
@ -190,32 +257,66 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
log.debug(externalVersioningErrorMessage,
|
log.debug(externalVersioningErrorMessage,
|
||||||
{ method: 'createAndStoreObject', error: errors.NotImplemented });
|
{ method: 'createAndStoreObject', error: errors.NotImplemented });
|
||||||
return process.nextTick(() => {
|
return process.nextTick(() => {
|
||||||
|
activeSpan.recordException(backendInfoObj.err);
|
||||||
|
objectStorageParamSpan.end(errors.NotImplemented.customizeDescription(
|
||||||
|
externalVersioningErrorMessage
|
||||||
|
));
|
||||||
callback(errors.NotImplemented.customizeDescription(
|
callback(errors.NotImplemented.customizeDescription(
|
||||||
externalVersioningErrorMessage));
|
externalVersioningErrorMessage));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Checked Versioning');
|
||||||
|
|
||||||
if (objMD && objMD.uploadId) {
|
if (objMD && objMD.uploadId) {
|
||||||
metadataStoreParams.oldReplayId = objMD.uploadId;
|
metadataStoreParams.oldReplayId = objMD.uploadId;
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Checked for Upload ID');
|
||||||
|
|
||||||
/* eslint-disable camelcase */
|
/* eslint-disable camelcase */
|
||||||
const dontSkipBackend = externalBackends;
|
const dontSkipBackend = externalBackends;
|
||||||
/* eslint-enable camelcase */
|
/* eslint-enable camelcase */
|
||||||
|
objectStorageParamSpan.end();
|
||||||
return async.waterfall([
|
return async.waterfall([
|
||||||
function storeData(next) {
|
next => tracer.startActiveSpan('Updating object Data in RING Using sproxyd', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'createAndStoreObject.storeData()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 246,
|
||||||
|
});
|
||||||
|
return next(null, currentSpan);
|
||||||
|
}),
|
||||||
|
function storeData(currentSpan, next) {
|
||||||
if (size === 0 && !dontSkipBackend[locationType]) {
|
if (size === 0 && !dontSkipBackend[locationType]) {
|
||||||
metadataStoreParams.contentMD5 = constants.emptyFileMd5;
|
metadataStoreParams.contentMD5 = constants.emptyFileMd5;
|
||||||
return next(null, null, null);
|
return next(null, null, null, currentSpan);
|
||||||
}
|
}
|
||||||
return dataStore(objectKeyContext, cipherBundle, request, size,
|
return dataStore(objectKeyContext, cipherBundle, request, size,
|
||||||
streamingV4Params, backendInfo, log, next);
|
streamingV4Params, backendInfo, log, (err, dataGetInfo, calculatedHash) => {
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
currentSpan.end();
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, dataGetInfo, calculatedHash, currentSpan);
|
||||||
|
});
|
||||||
},
|
},
|
||||||
function processDataResult(dataGetInfo, calculatedHash, next) {
|
(dataGetInfo, calculatedHash, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Stored Data');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, dataGetInfo, calculatedHash);
|
||||||
|
},
|
||||||
|
(dataGetInfo, calculatedHash, next) => tracer.startActiveSpan('Processing Data Storage Result', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'createAndStoreObject.processDataResult()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 273,
|
||||||
|
});
|
||||||
|
return next(null, dataGetInfo, calculatedHash, currentSpan);
|
||||||
|
}),
|
||||||
|
function processDataResult(dataGetInfo, calculatedHash, currentSpan, next) {
|
||||||
if (dataGetInfo === null || dataGetInfo === undefined) {
|
if (dataGetInfo === null || dataGetInfo === undefined) {
|
||||||
return next(null, null);
|
return next(null, null, currentSpan);
|
||||||
}
|
}
|
||||||
// So that data retrieval information for MPU's and
|
// So that data retrieval information for MPU's and
|
||||||
// regular puts are stored in the same data structure,
|
// regular puts are stored in the same data structure,
|
||||||
|
@ -234,9 +335,22 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
cipherBundle.cipheredDataKey;
|
cipherBundle.cipheredDataKey;
|
||||||
}
|
}
|
||||||
metadataStoreParams.contentMD5 = calculatedHash;
|
metadataStoreParams.contentMD5 = calculatedHash;
|
||||||
return next(null, dataGetInfoArr);
|
return next(null, dataGetInfoArr, currentSpan);
|
||||||
},
|
},
|
||||||
function getVersioningInfo(infoArr, next) {
|
(infoArr, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Processed Data Result');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, infoArr);
|
||||||
|
},
|
||||||
|
(infoArr, next) => tracer.startActiveSpan('Fetching Versioning Information', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'createAndStoreObject.getVersioningInfo()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 310,
|
||||||
|
});
|
||||||
|
return next(null, infoArr, currentSpan);
|
||||||
|
}),
|
||||||
|
function getVersioningInfo(infoArr, currentSpan, next) {
|
||||||
return versioningPreprocessing(bucketName, bucketMD,
|
return versioningPreprocessing(bucketName, bucketMD,
|
||||||
metadataStoreParams.objectKey, objMD, log, (err, options) => {
|
metadataStoreParams.objectKey, objMD, log, (err, options) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -249,10 +363,27 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
method: 'versioningPreprocessing',
|
method: 'versioningPreprocessing',
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return next(err, options, infoArr);
|
return next(err, options, infoArr, currentSpan);
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
function storeMDAndDeleteData(options, infoArr, next) {
|
(options, infoArr, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Got Versioning Info');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, options, infoArr);
|
||||||
|
},
|
||||||
|
(options, infoArr, next) => tracer.startActiveSpan('Updating Metadata and Deleting Old Data', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'createAndStoreObject.storeMDAndDeleteData()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 339,
|
||||||
|
});
|
||||||
|
return next(null, options, infoArr, currentSpan);
|
||||||
|
}),
|
||||||
|
function storeMDAndDeleteData(options, infoArr, currentSpan, next) {
|
||||||
|
const context = opentelemetry.trace.setSpan(
|
||||||
|
activeTracerContext,
|
||||||
|
currentSpan,
|
||||||
|
);
|
||||||
metadataStoreParams.versionId = options.versionId;
|
metadataStoreParams.versionId = options.versionId;
|
||||||
metadataStoreParams.versioning = options.versioning;
|
metadataStoreParams.versioning = options.versioning;
|
||||||
metadataStoreParams.isNull = options.isNull;
|
metadataStoreParams.isNull = options.isNull;
|
||||||
|
@ -262,7 +393,20 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||||
}
|
}
|
||||||
return _storeInMDandDeleteData(bucketName, infoArr,
|
return _storeInMDandDeleteData(bucketName, infoArr,
|
||||||
cipherBundle, metadataStoreParams,
|
cipherBundle, metadataStoreParams,
|
||||||
options.dataToDelete, log, requestMethod, next);
|
options.dataToDelete, log, requestMethod,
|
||||||
|
(err, result) => {
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
currentSpan.end();
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, result, currentSpan);
|
||||||
|
}, { activeSpan, activeTracerContext: context, tracer });
|
||||||
|
},
|
||||||
|
(result, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Stored Metadata and Deleted Old Data');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, result);
|
||||||
},
|
},
|
||||||
], callback);
|
], callback);
|
||||||
}
|
}
|
||||||
|
|
|
@ -273,13 +273,13 @@ function hasGovernanceBypassHeader(headers) {
|
||||||
* @param {function} cb - callback returns errors.AccessDenied if the authorization fails
|
* @param {function} cb - callback returns errors.AccessDenied if the authorization fails
|
||||||
* @returns {undefined} -
|
* @returns {undefined} -
|
||||||
*/
|
*/
|
||||||
function checkUserGovernanceBypass(request, authInfo, bucketMD, objectKey, log, cb) {
|
function checkUserGovernanceBypass(request, authInfo, bucketMD, objectKey, log, cb, oTel) {
|
||||||
log.trace(
|
log.trace(
|
||||||
'object in GOVERNANCE mode and is user, checking for attached policies',
|
'object in GOVERNANCE mode and is user, checking for attached policies',
|
||||||
{ method: 'checkUserPolicyGovernanceBypass' },
|
{ method: 'checkUserPolicyGovernanceBypass' },
|
||||||
);
|
);
|
||||||
|
|
||||||
const authParams = auth.server.extractParams(request, log, 's3', request.query);
|
const authParams = auth.server.extractParams(request, log, 's3', request.query, oTel);
|
||||||
const ip = policies.requestUtils.getClientIp(request, config);
|
const ip = policies.requestUtils.getClientIp(request, config);
|
||||||
const requestContextParams = {
|
const requestContextParams = {
|
||||||
constantParams: {
|
constantParams: {
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const querystring = require('querystring');
|
const querystring = require('querystring');
|
||||||
const { errors, versioning, s3middleware } = require('arsenal');
|
const { errors, versioning, s3middleware } = require('arsenal');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const constants = require('../../constants');
|
const constants = require('../../constants');
|
||||||
const services = require('../services');
|
const services = require('../services');
|
||||||
|
@ -288,108 +289,145 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
||||||
* with either error code or xml response body
|
* with either error code or xml response body
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function bucketGet(authInfo, request, log, callback) {
|
function bucketGet(authInfo, request, log, callback, oTel) {
|
||||||
const params = request.query;
|
const {
|
||||||
const bucketName = request.bucketName;
|
cloudserverApiSpan,
|
||||||
const v2 = params['list-type'];
|
activeSpan,
|
||||||
if (v2 !== undefined && Number.parseInt(v2, 10) !== 2) {
|
activeTracerContext,
|
||||||
return callback(errors.InvalidArgument.customizeDescription('Invalid ' +
|
tracer,
|
||||||
'List Type specified in Request'));
|
} = oTel;
|
||||||
}
|
activeSpan.addEvent('bucketGet: bucketGet called');
|
||||||
if (v2) {
|
const cloudserverApiSpanContext = opentelemetry.trace.setSpan(
|
||||||
log.addDefaultFields({
|
activeTracerContext,
|
||||||
action: 'ListObjectsV2',
|
cloudserverApiSpan,
|
||||||
|
);
|
||||||
|
return tracer.startActiveSpan('ListObjectsOrVersions - Processing Bucket Get Request', undefined, cloudserverApiSpanContext, listObjectsSpan => {
|
||||||
|
activeSpan.addEvent('bucketGet: processing request');
|
||||||
|
listObjectsSpan.setAttributes({
|
||||||
|
'code.filepath': 'lib/api/bucketGet.js',
|
||||||
|
'code.function': 'bucketGet',
|
||||||
|
'code.lineno': 304,
|
||||||
});
|
});
|
||||||
} else if (params.versions !== undefined) {
|
const listObjectsSpanContext = opentelemetry.trace.setSpan(
|
||||||
log.addDefaultFields({
|
cloudserverApiSpanContext,
|
||||||
action: 'ListObjectVersions',
|
listObjectsSpan,
|
||||||
});
|
);
|
||||||
}
|
const params = request.query;
|
||||||
log.debug('processing request', { method: 'bucketGet' });
|
const bucketName = request.bucketName;
|
||||||
const encoding = params['encoding-type'];
|
const v2 = params['list-type'];
|
||||||
if (encoding !== undefined && encoding !== 'url') {
|
if (v2 !== undefined && Number.parseInt(v2, 10) !== 2) {
|
||||||
monitoring.promMetrics(
|
activeSpan.recordException(errors.InvalidArgument);
|
||||||
'GET', bucketName, 400, 'listBucket');
|
return callback(errors.InvalidArgument.customizeDescription('Invalid ' +
|
||||||
return callback(errors.InvalidArgument.customizeDescription('Invalid ' +
|
'List Type specified in Request'));
|
||||||
'Encoding Method specified in Request'));
|
}
|
||||||
}
|
if (v2) {
|
||||||
const requestMaxKeys = params['max-keys'] ?
|
activeSpan.setAttributes({
|
||||||
Number.parseInt(params['max-keys'], 10) : 1000;
|
'aws.s3.apiMethod': 'bucketGet - ListObjectsV2',
|
||||||
if (Number.isNaN(requestMaxKeys) || requestMaxKeys < 0) {
|
});
|
||||||
monitoring.promMetrics(
|
log.addDefaultFields({
|
||||||
'GET', bucketName, 400, 'listBucket');
|
action: 'ListObjectsV2',
|
||||||
return callback(errors.InvalidArgument);
|
});
|
||||||
}
|
} else if (params.versions !== undefined) {
|
||||||
// AWS only returns 1000 keys even if max keys are greater.
|
log.addDefaultFields({
|
||||||
// Max keys stated in response xml can be greater than actual
|
action: 'ListObjectVersions',
|
||||||
// keys returned.
|
});
|
||||||
const actualMaxKeys = Math.min(constants.listingHardLimit, requestMaxKeys);
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.apiMethod': 'bucketGet - ListObjectVersions',
|
||||||
const metadataValParams = {
|
});
|
||||||
authInfo,
|
}
|
||||||
bucketName,
|
log.debug('processing request', { method: 'bucketGet' });
|
||||||
requestType: request.apiMethods || 'bucketGet',
|
const encoding = params['encoding-type'];
|
||||||
request,
|
if (encoding !== undefined && encoding !== 'url') {
|
||||||
};
|
|
||||||
const listParams = {
|
|
||||||
listingType: 'DelimiterMaster',
|
|
||||||
maxKeys: actualMaxKeys,
|
|
||||||
prefix: params.prefix,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (params.delimiter) {
|
|
||||||
listParams.delimiter = params.delimiter;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (v2) {
|
|
||||||
listParams.v2 = true;
|
|
||||||
listParams.startAfter = params['start-after'];
|
|
||||||
listParams.continuationToken =
|
|
||||||
decryptToken(params['continuation-token']);
|
|
||||||
listParams.fetchOwner = params['fetch-owner'] === 'true';
|
|
||||||
} else {
|
|
||||||
listParams.marker = params.marker;
|
|
||||||
}
|
|
||||||
|
|
||||||
standardMetadataValidateBucket(metadataValParams, request.actionImplicitDenies, log, (err, bucket) => {
|
|
||||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
|
||||||
request.method, bucket);
|
|
||||||
if (err) {
|
|
||||||
log.debug('error processing request', { error: err });
|
|
||||||
monitoring.promMetrics(
|
monitoring.promMetrics(
|
||||||
'GET', bucketName, err.code, 'listBucket');
|
'GET', bucketName, 400, 'listBucket');
|
||||||
return callback(err, null, corsHeaders);
|
activeSpan.recordException(errors.InvalidArgument);
|
||||||
|
return callback(errors.InvalidArgument.customizeDescription('Invalid ' +
|
||||||
|
'Encoding Method specified in Request'));
|
||||||
}
|
}
|
||||||
if (params.versions !== undefined) {
|
const requestMaxKeys = params['max-keys'] ?
|
||||||
listParams.listingType = 'DelimiterVersions';
|
Number.parseInt(params['max-keys'], 10) : 1000;
|
||||||
delete listParams.marker;
|
if (Number.isNaN(requestMaxKeys) || requestMaxKeys < 0) {
|
||||||
listParams.keyMarker = params['key-marker'];
|
monitoring.promMetrics(
|
||||||
listParams.versionIdMarker = params['version-id-marker'] ?
|
'GET', bucketName, 400, 'listBucket');
|
||||||
versionIdUtils.decode(params['version-id-marker']) : undefined;
|
activeSpan.recordException(errors.InvalidArgument);
|
||||||
|
return callback(errors.InvalidArgument);
|
||||||
}
|
}
|
||||||
if (!requestMaxKeys) {
|
// AWS only returns 1000 keys even if max keys are greater.
|
||||||
const emptyList = {
|
// Max keys stated in response xml can be greater than actual
|
||||||
CommonPrefixes: [],
|
// keys returned.
|
||||||
Contents: [],
|
const actualMaxKeys = Math.min(constants.listingHardLimit, requestMaxKeys);
|
||||||
Versions: [],
|
|
||||||
IsTruncated: false,
|
const metadataValParams = {
|
||||||
};
|
authInfo,
|
||||||
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
bucketName,
|
||||||
bucketName, emptyList, corsHeaders, log, callback);
|
requestType: request.apiMethods || 'bucketGet',
|
||||||
|
request,
|
||||||
|
};
|
||||||
|
const listParams = {
|
||||||
|
listingType: 'DelimiterMaster',
|
||||||
|
maxKeys: actualMaxKeys,
|
||||||
|
prefix: params.prefix,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (params.delimiter) {
|
||||||
|
listParams.delimiter = params.delimiter;
|
||||||
}
|
}
|
||||||
return services.getObjectListing(bucketName, listParams, log,
|
|
||||||
(err, list) => {
|
if (v2) {
|
||||||
|
listParams.v2 = true;
|
||||||
|
listParams.startAfter = params['start-after'];
|
||||||
|
listParams.continuationToken =
|
||||||
|
decryptToken(params['continuation-token']);
|
||||||
|
listParams.fetchOwner = params['fetch-owner'] === 'true';
|
||||||
|
} else {
|
||||||
|
listParams.marker = params.marker;
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('bucketGet: processed request');
|
||||||
|
return standardMetadataValidateBucket(metadataValParams, request.actionImplicitDenies, log, (err, bucket) => {
|
||||||
|
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
|
request.method, bucket);
|
||||||
if (err) {
|
if (err) {
|
||||||
log.debug('error processing request', { error: err });
|
log.debug('error processing request', { error: err });
|
||||||
monitoring.promMetrics(
|
monitoring.promMetrics(
|
||||||
'GET', bucketName, err.code, 'listBucket');
|
'GET', bucketName, err.code, 'listBucket');
|
||||||
return callback(err, null, corsHeaders);
|
return callback(err, null, corsHeaders);
|
||||||
}
|
}
|
||||||
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
if (params.versions !== undefined) {
|
||||||
bucketName, list, corsHeaders, log, callback);
|
listParams.listingType = 'DelimiterVersions';
|
||||||
});
|
delete listParams.marker;
|
||||||
|
listParams.keyMarker = params['key-marker'];
|
||||||
|
listParams.versionIdMarker = params['version-id-marker'] ?
|
||||||
|
versionIdUtils.decode(params['version-id-marker']) : undefined;
|
||||||
|
}
|
||||||
|
if (!requestMaxKeys) {
|
||||||
|
const emptyList = {
|
||||||
|
CommonPrefixes: [],
|
||||||
|
Contents: [],
|
||||||
|
Versions: [],
|
||||||
|
IsTruncated: false,
|
||||||
|
};
|
||||||
|
listObjectsSpan.end();
|
||||||
|
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
||||||
|
bucketName, emptyList, corsHeaders, log, callback);
|
||||||
|
}
|
||||||
|
return services.getObjectListing(bucketName, listParams, log,
|
||||||
|
(err, list) => {
|
||||||
|
listObjectsSpan.end();
|
||||||
|
if (err) {
|
||||||
|
log.debug('error processing request', { error: err });
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, err.code, 'listBucket');
|
||||||
|
return callback(err, null, corsHeaders);
|
||||||
|
}
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.objects.count': list.Contents ? list.Contents.length : 0,
|
||||||
|
// 'aws.s3.action': 'bucketGet',
|
||||||
|
});
|
||||||
|
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
||||||
|
bucketName, list, corsHeaders, log, callback);
|
||||||
|
});
|
||||||
|
}, { activeSpan, activeTracerContext: listObjectsSpanContext, tracer });
|
||||||
});
|
});
|
||||||
return undefined;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = bucketGet;
|
module.exports = bucketGet;
|
||||||
|
|
|
@ -197,7 +197,7 @@ function authBucketPut(authParams, bucketName, locationConstraint, request, auth
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function bucketPut(authInfo, request, log, callback) {
|
function bucketPut(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'bucketPut' });
|
log.debug('processing request', { method: 'bucketPut' });
|
||||||
|
|
||||||
if (authInfo.isRequesterPublicUser()) {
|
if (authInfo.isRequesterPublicUser()) {
|
||||||
|
@ -230,7 +230,7 @@ function bucketPut(authInfo, request, log, callback) {
|
||||||
return next(null, locationConstraint);
|
return next(null, locationConstraint);
|
||||||
}
|
}
|
||||||
|
|
||||||
const authParams = auth.server.extractParams(request, log, 's3', request.query);
|
const authParams = auth.server.extractParams(request, log, 's3', request.query, oTel);
|
||||||
const requestConstantParams = authBucketPut(
|
const requestConstantParams = authBucketPut(
|
||||||
authParams, bucketName, locationConstraint, request, authInfo
|
authParams, bucketName, locationConstraint, request, authInfo
|
||||||
);
|
);
|
||||||
|
|
|
@ -67,7 +67,7 @@ const REPLICATION_ACTION = 'MPU';
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function completeMultipartUpload(authInfo, request, log, callback) {
|
function completeMultipartUpload(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'completeMultipartUpload' });
|
log.debug('processing request', { method: 'completeMultipartUpload' });
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
const objectKey = request.objectKey;
|
const objectKey = request.objectKey;
|
||||||
|
@ -119,7 +119,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
||||||
requestType: request.apiMethods || 'completeMultipartUpload',
|
requestType: request.apiMethods || 'completeMultipartUpload',
|
||||||
request,
|
request,
|
||||||
};
|
};
|
||||||
standardMetadataValidateBucketAndObj(metadataValParams, request.actionImplicitDenies, log, next);
|
standardMetadataValidateBucketAndObj(metadataValParams, request.actionImplicitDenies, log, next, oTel);
|
||||||
},
|
},
|
||||||
function validateMultipart(destBucket, objMD, next) {
|
function validateMultipart(destBucket, objMD, next) {
|
||||||
if (objMD) {
|
if (objMD) {
|
||||||
|
@ -434,7 +434,7 @@ function completeMultipartUpload(authInfo, request, log, callback) {
|
||||||
return next(null, mpuBucket, keysToDelete, aggregateETag,
|
return next(null, mpuBucket, keysToDelete, aggregateETag,
|
||||||
extraPartLocations, destinationBucket,
|
extraPartLocations, destinationBucket,
|
||||||
generatedVersionId);
|
generatedVersionId);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
|
function deletePartsMetadata(mpuBucket, keysToDelete, aggregateETag,
|
||||||
extraPartLocations, destinationBucket, generatedVersionId, next) {
|
extraPartLocations, destinationBucket, generatedVersionId, next) {
|
||||||
|
|
|
@ -44,7 +44,7 @@ Sample xml response:
|
||||||
* @return {undefined} calls callback from router
|
* @return {undefined} calls callback from router
|
||||||
* with err and result as arguments
|
* with err and result as arguments
|
||||||
*/
|
*/
|
||||||
function initiateMultipartUpload(authInfo, request, log, callback) {
|
function initiateMultipartUpload(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'initiateMultipartUpload' });
|
log.debug('processing request', { method: 'initiateMultipartUpload' });
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
const objectKey = request.objectKey;
|
const objectKey = request.objectKey;
|
||||||
|
@ -275,7 +275,7 @@ function initiateMultipartUpload(authInfo, request, log, callback) {
|
||||||
return next(error, corsHeaders);
|
return next(error, corsHeaders);
|
||||||
}
|
}
|
||||||
return next(null, corsHeaders, destinationBucket);
|
return next(null, corsHeaders, destinationBucket);
|
||||||
}),
|
}, oTel),
|
||||||
(corsHeaders, destinationBucket, next) => {
|
(corsHeaders, destinationBucket, next) => {
|
||||||
if (destinationBucket.hasDeletedFlag() && accountCanonicalID !== destinationBucket.getOwner()) {
|
if (destinationBucket.hasDeletedFlag() && accountCanonicalID !== destinationBucket.getOwner()) {
|
||||||
log.trace('deleted flag on bucket and request from non-owner account');
|
log.trace('deleted flag on bucket and request from non-owner account');
|
||||||
|
|
|
@ -73,7 +73,7 @@ function buildXML(xmlParams, xml, encodingFn) {
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function listParts(authInfo, request, log, callback) {
|
function listParts(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'listParts' });
|
log.debug('processing request', { method: 'listParts' });
|
||||||
|
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
|
@ -131,7 +131,7 @@ function listParts(authInfo, request, log, callback) {
|
||||||
'bucketPolicyGoAhead';
|
'bucketPolicyGoAhead';
|
||||||
}
|
}
|
||||||
return next(null, destinationBucket);
|
return next(null, destinationBucket);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function waterfall2(destBucket, next) {
|
function waterfall2(destBucket, next) {
|
||||||
metadataValMPUparams.log = log;
|
metadataValMPUparams.log = log;
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
const crypto = require('crypto');
|
const crypto = require('crypto');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const { parseString } = require('xml2js');
|
const { parseString } = require('xml2js');
|
||||||
|
@ -246,202 +247,222 @@ function initializeMultiObjectDeleteWithBatchingSupport(bucketName, inPlay, log,
|
||||||
* successfullyDeleted, totalContentLengthDeleted)
|
* successfullyDeleted, totalContentLengthDeleted)
|
||||||
*/
|
*/
|
||||||
function getObjMetadataAndDelete(authInfo, canonicalID, request,
|
function getObjMetadataAndDelete(authInfo, canonicalID, request,
|
||||||
bucketName, bucket, quietSetting, errorResults, inPlay, log, next) {
|
bucketName, bucket, quietSetting, errorResults, inPlay, log, next, oTel) {
|
||||||
const successfullyDeleted = [];
|
const {
|
||||||
let totalContentLengthDeleted = 0;
|
currentSpan,
|
||||||
let numOfObjectsRemoved = 0;
|
activeSpan,
|
||||||
const skipError = new Error('skip');
|
activeTracerContext,
|
||||||
const objectLockedError = new Error('object locked');
|
tracer,
|
||||||
let deleteFromStorage = [];
|
} = oTel;
|
||||||
|
return tracer.startActiveSpan('Get Object Metadata and Delete', undefined, activeTracerContext, getObjMetadataAndDeleteSpan => {
|
||||||
|
getObjMetadataAndDeleteSpan.setAttributes({
|
||||||
|
'code.function': 'getObjMetadataAndDelete()',
|
||||||
|
'code.filename': 'lib/api/multiObjectDelete.js',
|
||||||
|
'code.lineno': 221,
|
||||||
|
});
|
||||||
|
activeSpan.addEvent('Getting Object Metadata and Deleting Objects');
|
||||||
|
const successfullyDeleted = [];
|
||||||
|
let totalContentLengthDeleted = 0;
|
||||||
|
let numOfObjectsRemoved = 0;
|
||||||
|
const skipError = new Error('skip');
|
||||||
|
const objectLockedError = new Error('object locked');
|
||||||
|
let deleteFromStorage = [];
|
||||||
|
|
||||||
return async.waterfall([
|
return async.waterfall([
|
||||||
callback => initializeMultiObjectDeleteWithBatchingSupport(bucketName, inPlay, log, callback),
|
callback => initializeMultiObjectDeleteWithBatchingSupport(bucketName, inPlay, log, callback),
|
||||||
(cache, callback) => async.forEachLimit(inPlay, config.multiObjectDeleteConcurrency, (entry, moveOn) => {
|
(cache, callback) => async.forEachLimit(inPlay, config.multiObjectDeleteConcurrency, (entry, moveOn) => {
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
callback => callback(...decodeObjectVersion(entry, bucketName)),
|
callback => callback(...decodeObjectVersion(entry, bucketName)),
|
||||||
// for obj deletes, no need to check acl's at object level
|
// for obj deletes, no need to check acl's at object level
|
||||||
// (authority is at the bucket level for obj deletes)
|
// (authority is at the bucket level for obj deletes)
|
||||||
(versionId, callback) => metadataUtils.metadataGetObject(bucketName, entry.key,
|
(versionId, callback) => metadataUtils.metadataGetObject(bucketName, entry.key,
|
||||||
versionId, cache, log, (err, objMD) => callback(err, objMD, versionId)),
|
versionId, cache, log, (err, objMD) => callback(err, objMD, versionId)),
|
||||||
(objMD, versionId, callback) => {
|
(objMD, versionId, callback) => {
|
||||||
if (!objMD) {
|
if (!objMD) {
|
||||||
const verCfg = bucket.getVersioningConfiguration();
|
const verCfg = bucket.getVersioningConfiguration();
|
||||||
// To adhere to AWS behavior, create a delete marker
|
// To adhere to AWS behavior, create a delete marker
|
||||||
// if trying to delete an object that does not exist
|
// if trying to delete an object that does not exist
|
||||||
// when versioning has been configured
|
// when versioning has been configured
|
||||||
if (verCfg && !entry.versionId) {
|
if (verCfg && !entry.versionId) {
|
||||||
log.debug('trying to delete specific version ' +
|
log.debug('trying to delete specific version ' +
|
||||||
'that does not exist');
|
'that does not exist');
|
||||||
|
return callback(null, objMD, versionId);
|
||||||
|
}
|
||||||
|
// otherwise if particular key does not exist, AWS
|
||||||
|
// returns success for key so add to successfullyDeleted
|
||||||
|
// list and move on
|
||||||
|
successfullyDeleted.push({ entry });
|
||||||
|
return callback(skipError);
|
||||||
|
}
|
||||||
|
if (versionId && objMD.location &&
|
||||||
|
Array.isArray(objMD.location) && objMD.location[0]) {
|
||||||
|
// we need this information for data deletes to AWS
|
||||||
|
// eslint-disable-next-line no-param-reassign
|
||||||
|
objMD.location[0].deleteVersion = true;
|
||||||
|
}
|
||||||
|
return callback(null, objMD, versionId);
|
||||||
|
},
|
||||||
|
(objMD, versionId, callback) => {
|
||||||
|
// AWS only returns an object lock error if a version id
|
||||||
|
// is specified, else continue to create a delete marker
|
||||||
|
if (!versionId || !bucket.isObjectLockEnabled()) {
|
||||||
|
return callback(null, null, objMD, versionId);
|
||||||
|
}
|
||||||
|
const hasGovernanceBypass = hasGovernanceBypassHeader(request.headers);
|
||||||
|
if (hasGovernanceBypass && authInfo.isRequesterAnIAMUser()) {
|
||||||
|
return checkUserGovernanceBypass(request, authInfo, bucket, entry.key, log, error => {
|
||||||
|
if (error && error.is.AccessDenied) {
|
||||||
|
log.debug('user does not have BypassGovernanceRetention and object is locked',
|
||||||
|
{ error });
|
||||||
|
return callback(objectLockedError);
|
||||||
|
}
|
||||||
|
if (error) {
|
||||||
|
return callback(error);
|
||||||
|
}
|
||||||
|
return callback(null, hasGovernanceBypass, objMD, versionId);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return callback(null, hasGovernanceBypass, objMD, versionId);
|
||||||
|
},
|
||||||
|
(hasGovernanceBypass, objMD, versionId, callback) => {
|
||||||
|
// AWS only returns an object lock error if a version id
|
||||||
|
// is specified, else continue to create a delete marker
|
||||||
|
if (!versionId || !bucket.isObjectLockEnabled()) {
|
||||||
return callback(null, objMD, versionId);
|
return callback(null, objMD, versionId);
|
||||||
}
|
}
|
||||||
// otherwise if particular key does not exist, AWS
|
const objLockInfo = new ObjectLockInfo({
|
||||||
// returns success for key so add to successfullyDeleted
|
mode: objMD.retentionMode,
|
||||||
// list and move on
|
date: objMD.retentionDate,
|
||||||
successfullyDeleted.push({ entry });
|
legalHold: objMD.legalHold || false,
|
||||||
return callback(skipError);
|
|
||||||
}
|
|
||||||
if (versionId && objMD.location &&
|
|
||||||
Array.isArray(objMD.location) && objMD.location[0]) {
|
|
||||||
// we need this information for data deletes to AWS
|
|
||||||
// eslint-disable-next-line no-param-reassign
|
|
||||||
objMD.location[0].deleteVersion = true;
|
|
||||||
}
|
|
||||||
return callback(null, objMD, versionId);
|
|
||||||
},
|
|
||||||
(objMD, versionId, callback) => {
|
|
||||||
// AWS only returns an object lock error if a version id
|
|
||||||
// is specified, else continue to create a delete marker
|
|
||||||
if (!versionId || !bucket.isObjectLockEnabled()) {
|
|
||||||
return callback(null, null, objMD, versionId);
|
|
||||||
}
|
|
||||||
const hasGovernanceBypass = hasGovernanceBypassHeader(request.headers);
|
|
||||||
if (hasGovernanceBypass && authInfo.isRequesterAnIAMUser()) {
|
|
||||||
return checkUserGovernanceBypass(request, authInfo, bucket, entry.key, log, error => {
|
|
||||||
if (error && error.is.AccessDenied) {
|
|
||||||
log.debug('user does not have BypassGovernanceRetention and object is locked',
|
|
||||||
{ error });
|
|
||||||
return callback(objectLockedError);
|
|
||||||
}
|
|
||||||
if (error) {
|
|
||||||
return callback(error);
|
|
||||||
}
|
|
||||||
return callback(null, hasGovernanceBypass, objMD, versionId);
|
|
||||||
});
|
});
|
||||||
}
|
|
||||||
return callback(null, hasGovernanceBypass, objMD, versionId);
|
// If the object can not be deleted raise an error
|
||||||
},
|
if (!objLockInfo.canModifyObject(hasGovernanceBypass)) {
|
||||||
(hasGovernanceBypass, objMD, versionId, callback) => {
|
log.debug('trying to delete locked object');
|
||||||
// AWS only returns an object lock error if a version id
|
return callback(objectLockedError);
|
||||||
// is specified, else continue to create a delete marker
|
}
|
||||||
if (!versionId || !bucket.isObjectLockEnabled()) {
|
|
||||||
return callback(null, objMD, versionId);
|
return callback(null, objMD, versionId);
|
||||||
|
},
|
||||||
|
(objMD, versionId, callback) => {
|
||||||
|
const options = preprocessingVersioningDelete(
|
||||||
|
bucketName, bucket, objMD, versionId, config.nullVersionCompatMode);
|
||||||
|
const deleteInfo = {};
|
||||||
|
if (options && options.deleteData) {
|
||||||
|
options.overheadField = overheadField;
|
||||||
|
deleteInfo.deleted = true;
|
||||||
|
if (!_bucketRequiresOplogUpdate(bucket)) {
|
||||||
|
options.doesNotNeedOpogUpdate = true;
|
||||||
|
}
|
||||||
|
if (objMD.uploadId) {
|
||||||
|
// eslint-disable-next-line
|
||||||
|
options.replayId = objMD.uploadId;
|
||||||
|
}
|
||||||
|
return services.deleteObject(bucketName, objMD,
|
||||||
|
entry.key, options, config.multiObjectDeleteEnableOptimizations, log, (err, toDelete) => {
|
||||||
|
if (err) {
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
if (toDelete) {
|
||||||
|
deleteFromStorage = deleteFromStorage.concat(toDelete);
|
||||||
|
}
|
||||||
|
return callback(null, objMD, deleteInfo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
deleteInfo.newDeleteMarker = true;
|
||||||
|
// This call will create a delete-marker
|
||||||
|
return createAndStoreObject(bucketName, bucket, entry.key,
|
||||||
|
objMD, authInfo, canonicalID, null, request,
|
||||||
|
deleteInfo.newDeleteMarker, null, overheadField, log, (err, result) =>
|
||||||
|
callback(err, objMD, deleteInfo, result.versionId), oTel);
|
||||||
|
},
|
||||||
|
], (err, objMD, deleteInfo, versionId) => {
|
||||||
|
if (err === skipError) {
|
||||||
|
return moveOn();
|
||||||
|
} else if (err === objectLockedError) {
|
||||||
|
errorResults.push({ entry, error: errors.AccessDenied, objectLocked: true });
|
||||||
|
return moveOn();
|
||||||
|
} else if (err) {
|
||||||
|
log.error('error deleting object', { error: err, entry });
|
||||||
|
errorResults.push({ entry, error: err });
|
||||||
|
return moveOn();
|
||||||
}
|
}
|
||||||
const objLockInfo = new ObjectLockInfo({
|
if (deleteInfo.deleted && objMD['content-length']) {
|
||||||
mode: objMD.retentionMode,
|
numOfObjectsRemoved++;
|
||||||
date: objMD.retentionDate,
|
totalContentLengthDeleted += objMD['content-length'];
|
||||||
legalHold: objMD.legalHold || false,
|
}
|
||||||
|
let isDeleteMarker;
|
||||||
|
let deleteMarkerVersionId;
|
||||||
|
// - If trying to delete an object that does not exist (if a new
|
||||||
|
// delete marker was created)
|
||||||
|
// - Or if an object exists but no version was specified
|
||||||
|
// return DeleteMarkerVersionId equals the versionID of the marker
|
||||||
|
// you just generated and DeleteMarker tag equals true
|
||||||
|
if (deleteInfo.newDeleteMarker) {
|
||||||
|
isDeleteMarker = true;
|
||||||
|
deleteMarkerVersionId = versionIdUtils.encode(
|
||||||
|
versionId, config.versionIdEncodingType);
|
||||||
|
// In this case we are putting a new object (i.e., the delete
|
||||||
|
// marker), so we decrement the numOfObjectsRemoved value.
|
||||||
|
numOfObjectsRemoved--;
|
||||||
|
// If trying to delete a delete marker, DeleteMarkerVersionId equals
|
||||||
|
// deleteMarker's versionID and DeleteMarker equals true
|
||||||
|
} else if (objMD && objMD.isDeleteMarker) {
|
||||||
|
isDeleteMarker = true;
|
||||||
|
deleteMarkerVersionId = entry.versionId;
|
||||||
|
}
|
||||||
|
successfullyDeleted.push({
|
||||||
|
entry, isDeleteMarker,
|
||||||
|
deleteMarkerVersionId,
|
||||||
});
|
});
|
||||||
|
|
||||||
// If the object can not be deleted raise an error
|
|
||||||
if (!objLockInfo.canModifyObject(hasGovernanceBypass)) {
|
|
||||||
log.debug('trying to delete locked object');
|
|
||||||
return callback(objectLockedError);
|
|
||||||
}
|
|
||||||
|
|
||||||
return callback(null, objMD, versionId);
|
|
||||||
},
|
|
||||||
(objMD, versionId, callback) => {
|
|
||||||
const options = preprocessingVersioningDelete(
|
|
||||||
bucketName, bucket, objMD, versionId, config.nullVersionCompatMode);
|
|
||||||
const deleteInfo = {};
|
|
||||||
if (options && options.deleteData) {
|
|
||||||
options.overheadField = overheadField;
|
|
||||||
deleteInfo.deleted = true;
|
|
||||||
if (!_bucketRequiresOplogUpdate(bucket)) {
|
|
||||||
options.doesNotNeedOpogUpdate = true;
|
|
||||||
}
|
|
||||||
if (objMD.uploadId) {
|
|
||||||
// eslint-disable-next-line
|
|
||||||
options.replayId = objMD.uploadId;
|
|
||||||
}
|
|
||||||
return services.deleteObject(bucketName, objMD,
|
|
||||||
entry.key, options, config.multiObjectDeleteEnableOptimizations, log, (err, toDelete) => {
|
|
||||||
if (err) {
|
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
if (toDelete) {
|
|
||||||
deleteFromStorage = deleteFromStorage.concat(toDelete);
|
|
||||||
}
|
|
||||||
return callback(null, objMD, deleteInfo);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
deleteInfo.newDeleteMarker = true;
|
|
||||||
// This call will create a delete-marker
|
|
||||||
return createAndStoreObject(bucketName, bucket, entry.key,
|
|
||||||
objMD, authInfo, canonicalID, null, request,
|
|
||||||
deleteInfo.newDeleteMarker, null, overheadField, log, (err, result) =>
|
|
||||||
callback(err, objMD, deleteInfo, result.versionId));
|
|
||||||
},
|
|
||||||
], (err, objMD, deleteInfo, versionId) => {
|
|
||||||
if (err === skipError) {
|
|
||||||
return moveOn();
|
return moveOn();
|
||||||
} else if (err === objectLockedError) {
|
|
||||||
errorResults.push({ entry, error: errors.AccessDenied, objectLocked: true });
|
|
||||||
return moveOn();
|
|
||||||
} else if (err) {
|
|
||||||
log.error('error deleting object', { error: err, entry });
|
|
||||||
errorResults.push({ entry, error: err });
|
|
||||||
return moveOn();
|
|
||||||
}
|
|
||||||
if (deleteInfo.deleted && objMD['content-length']) {
|
|
||||||
numOfObjectsRemoved++;
|
|
||||||
totalContentLengthDeleted += objMD['content-length'];
|
|
||||||
}
|
|
||||||
let isDeleteMarker;
|
|
||||||
let deleteMarkerVersionId;
|
|
||||||
// - If trying to delete an object that does not exist (if a new
|
|
||||||
// delete marker was created)
|
|
||||||
// - Or if an object exists but no version was specified
|
|
||||||
// return DeleteMarkerVersionId equals the versionID of the marker
|
|
||||||
// you just generated and DeleteMarker tag equals true
|
|
||||||
if (deleteInfo.newDeleteMarker) {
|
|
||||||
isDeleteMarker = true;
|
|
||||||
deleteMarkerVersionId = versionIdUtils.encode(
|
|
||||||
versionId, config.versionIdEncodingType);
|
|
||||||
// In this case we are putting a new object (i.e., the delete
|
|
||||||
// marker), so we decrement the numOfObjectsRemoved value.
|
|
||||||
numOfObjectsRemoved--;
|
|
||||||
// If trying to delete a delete marker, DeleteMarkerVersionId equals
|
|
||||||
// deleteMarker's versionID and DeleteMarker equals true
|
|
||||||
} else if (objMD && objMD.isDeleteMarker) {
|
|
||||||
isDeleteMarker = true;
|
|
||||||
deleteMarkerVersionId = entry.versionId;
|
|
||||||
}
|
|
||||||
successfullyDeleted.push({
|
|
||||||
entry, isDeleteMarker,
|
|
||||||
deleteMarkerVersionId,
|
|
||||||
});
|
});
|
||||||
return moveOn();
|
},
|
||||||
});
|
// end of forEach func
|
||||||
},
|
err => {
|
||||||
// end of forEach func
|
// Batch delete all objects
|
||||||
err => {
|
const onDone = () => {
|
||||||
// Batch delete all objects
|
activeSpan.addEvent('Got Object Metadata and Deleted Objects');
|
||||||
const onDone = () => callback(err, quietSetting, errorResults, numOfObjectsRemoved,
|
currentSpan.end();
|
||||||
successfullyDeleted, totalContentLengthDeleted, bucket);
|
return callback(err, quietSetting, errorResults, numOfObjectsRemoved,
|
||||||
|
successfullyDeleted, totalContentLengthDeleted, bucket)
|
||||||
|
};
|
||||||
|
|
||||||
if (err && deleteFromStorage.length === 0) {
|
if (err && deleteFromStorage.length === 0) {
|
||||||
log.trace('no objects to delete from data backend');
|
log.trace('no objects to delete from data backend');
|
||||||
return onDone();
|
|
||||||
}
|
|
||||||
// If error but we have objects in the list, delete them to ensure
|
|
||||||
// consistent state.
|
|
||||||
log.trace('deleting objects from data backend');
|
|
||||||
|
|
||||||
// Split the array into chunks
|
|
||||||
const chunks = [];
|
|
||||||
while (deleteFromStorage.length > 0) {
|
|
||||||
chunks.push(deleteFromStorage.splice(0, config.multiObjectDeleteConcurrency));
|
|
||||||
}
|
|
||||||
|
|
||||||
return async.each(chunks, (chunk, done) => data.batchDelete(chunk, null, null,
|
|
||||||
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids()), done),
|
|
||||||
err => {
|
|
||||||
if (err) {
|
|
||||||
log.error('error deleting objects from data backend', { error: err });
|
|
||||||
return onDone(err);
|
|
||||||
}
|
|
||||||
return onDone();
|
return onDone();
|
||||||
});
|
}
|
||||||
}),
|
// If error but we have objects in the list, delete them to ensure
|
||||||
], (err, ...results) => {
|
// consistent state.
|
||||||
// if general error from metadata return error
|
log.trace('deleting objects from data backend');
|
||||||
if (err) {
|
|
||||||
monitoring.promMetrics('DELETE', bucketName, err.code,
|
// Split the array into chunks
|
||||||
'multiObjectDelete');
|
const chunks = [];
|
||||||
return next(err);
|
while (deleteFromStorage.length > 0) {
|
||||||
}
|
chunks.push(deleteFromStorage.splice(0, config.multiObjectDeleteConcurrency));
|
||||||
return next(null, ...results);
|
}
|
||||||
|
|
||||||
|
return async.each(chunks, (chunk, done) => data.batchDelete(chunk, null, null,
|
||||||
|
logger.newRequestLoggerFromSerializedUids(log.getSerializedUids()), done),
|
||||||
|
err => {
|
||||||
|
if (err) {
|
||||||
|
log.error('error deleting objects from data backend', { error: err });
|
||||||
|
return onDone(err);
|
||||||
|
}
|
||||||
|
return onDone();
|
||||||
|
});
|
||||||
|
}),
|
||||||
|
], (err, ...results) => {
|
||||||
|
activeSpan.addEvent('Got Object Metadata and Deleted Objects');
|
||||||
|
getObjMetadataAndDeleteSpan.end();
|
||||||
|
// if general error from metadata return error
|
||||||
|
if (err) {
|
||||||
|
monitoring.promMetrics('DELETE', bucketName, err.code,
|
||||||
|
'multiObjectDelete');
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, ...results);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,224 +481,318 @@ function getObjMetadataAndDelete(authInfo, canonicalID, request,
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function multiObjectDelete(authInfo, request, log, callback) {
|
function multiObjectDelete(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'multiObjectDelete' });
|
const {
|
||||||
if (!request.post) {
|
cloudserverApiSpan,
|
||||||
monitoring.promMetrics('DELETE', request.bucketName, 400,
|
activeSpan,
|
||||||
'multiObjectDelete');
|
activeTracerContext,
|
||||||
return callback(errors.MissingRequestBodyError);
|
tracer,
|
||||||
}
|
} = oTel;
|
||||||
const md5 = crypto.createHash('md5')
|
activeSpan.addEvent('Entered objectPut() Function');
|
||||||
.update(request.post, 'utf8').digest('base64');
|
const cloudserverApiSpanContext = opentelemetry.trace.setSpan(
|
||||||
if (md5 !== request.headers['content-md5']) {
|
activeTracerContext,
|
||||||
monitoring.promMetrics('DELETE', request.bucketName, 400,
|
cloudserverApiSpan,
|
||||||
'multiObjectDelete');
|
);
|
||||||
return callback(errors.BadDigest);
|
return tracer.startActiveSpan('Validating and Deleting Objects from S3', undefined, cloudserverApiSpanContext, multiObjectDeleteSpan => {
|
||||||
}
|
const multiObjectDeleteSpanContext = opentelemetry.trace.setSpan(
|
||||||
|
activeTracerContext,
|
||||||
const bucketName = request.bucketName;
|
multiObjectDeleteSpan,
|
||||||
const canonicalID = authInfo.getCanonicalID();
|
);
|
||||||
|
multiObjectDeleteSpan.setAttributes({
|
||||||
return async.waterfall([
|
'code.function': 'multiObjectDelete()',
|
||||||
function parseXML(next) {
|
'code.filename': 'lib/api/multiObjectDelete.js',
|
||||||
return _parseXml(request.post,
|
'code.lineno': 464,
|
||||||
(err, quietSetting, objects) => {
|
});
|
||||||
if (err || objects.length < 1 || objects.length > 1000) {
|
activeSpan.addEvent('Processing Delete Objects (multi) Request');
|
||||||
return next(errors.MalformedXML);
|
log.debug('processing request', { method: 'multiObjectDelete' });
|
||||||
}
|
if (!request.post) {
|
||||||
return next(null, quietSetting, objects);
|
monitoring.promMetrics('DELETE', request.bucketName, 400,
|
||||||
});
|
'multiObjectDelete');
|
||||||
},
|
activeSpan.recordException(errors.MissingRequestBodyError);
|
||||||
function checkBucketMetadata(quietSetting, objects, next) {
|
multiObjectDeleteSpan.end();
|
||||||
const errorResults = [];
|
return callback(errors.MissingRequestBodyError);
|
||||||
return metadata.getBucket(bucketName, log, (err, bucketMD) => {
|
}
|
||||||
if (err) {
|
const md5 = crypto.createHash('md5')
|
||||||
log.trace('error retrieving bucket metadata',
|
.update(request.post, 'utf8').digest('base64');
|
||||||
{ error: err });
|
if (md5 !== request.headers['content-md5']) {
|
||||||
return next(err);
|
monitoring.promMetrics('DELETE', request.bucketName, 400,
|
||||||
}
|
'multiObjectDelete');
|
||||||
// check whether bucket has transient or deleted flag
|
activeSpan.recordException(errors.BadDigest);
|
||||||
if (bucketShield(bucketMD, 'objectDelete')) {
|
multiObjectDeleteSpan.end();
|
||||||
return next(errors.NoSuchBucket);
|
return callback(errors.BadDigest);
|
||||||
}
|
}
|
||||||
if (!isBucketAuthorized(bucketMD, 'objectDelete', canonicalID, authInfo, log, request,
|
|
||||||
request.actionImplicitDenies)) {
|
const bucketName = request.bucketName;
|
||||||
log.trace("access denied due to bucket acl's");
|
const canonicalID = authInfo.getCanonicalID();
|
||||||
// if access denied at the bucket level, no access for
|
|
||||||
// any of the objects so all results will be error results
|
return async.waterfall([
|
||||||
objects.forEach(entry => {
|
function parseXML(next) {
|
||||||
errorResults.push({
|
activeSpan.addEvent('Parsing XML Request');
|
||||||
entry,
|
return tracer.startActiveSpan('Bucket check for Server side configuration - SSE', undefined, multiObjectDeleteSpanContext, currentSpan => {
|
||||||
error: errors.AccessDenied,
|
currentSpan.setAttributes({
|
||||||
});
|
'code.function': 'multiObjectDelete()',
|
||||||
});
|
'code.filename': 'lib/api/multiObjectDelete.js',
|
||||||
// by sending an empty array as the objects array
|
'code.lineno': 505,
|
||||||
// async.forEachLimit below will not actually
|
});
|
||||||
// make any calls to metadata or data but will continue on
|
return _parseXml(request.post,
|
||||||
// to the next step to build xml
|
(err, quietSetting, objects) => {
|
||||||
return next(null, quietSetting, errorResults, [], bucketMD);
|
if (err || objects.length < 1 || objects.length > 1000) {
|
||||||
}
|
activeSpan.recordException(errors.MalformedXML);
|
||||||
return next(null, quietSetting, errorResults, objects, bucketMD);
|
multiObjectDeleteSpan.end();
|
||||||
});
|
activeSpan.recordException(errors.MalformedXML);
|
||||||
},
|
currentSpan.end();
|
||||||
function checkPolicies(quietSetting, errorResults, objects, bucketMD, next) {
|
multiObjectDeleteSpan.end();
|
||||||
// track keys that are still on track to be deleted
|
return next(errors.MalformedXML);
|
||||||
const inPlay = [];
|
}
|
||||||
// if request from account, no need to check policies
|
activeSpan.setAttributes({
|
||||||
// all objects are inPlay so send array of object keys
|
'aws.s3.objects.count': objects.length,
|
||||||
// as inPlay argument
|
});
|
||||||
if (!authInfo.isRequesterAnIAMUser()) {
|
activeSpan.addEvent('Parsed XML Request');
|
||||||
return next(null, quietSetting, errorResults, objects, bucketMD);
|
currentSpan.end();
|
||||||
}
|
return next(null, quietSetting, objects);
|
||||||
|
});
|
||||||
// TODO: once arsenal's extractParams is separated from doAuth
|
});
|
||||||
// function, refactor so only extract once and send
|
},
|
||||||
// params on to this api
|
function checkBucketMetadata(quietSetting, objects, next) {
|
||||||
const authParams = auth.server.extractParams(request, log,
|
activeSpan.addEvent('Retrieve Bucket Metadata');
|
||||||
's3', request.query);
|
return tracer.startActiveSpan('Checking Bucket Metadata and check if user is authorized', undefined, multiObjectDeleteSpanContext, currentSpan => {
|
||||||
const ip = requestUtils.getClientIp(request, config);
|
const errorResults = [];
|
||||||
const requestContextParams = {
|
return metadata.getBucket(bucketName, log, (err, bucketMD) => {
|
||||||
constantParams: {
|
if (err) {
|
||||||
headers: request.headers,
|
log.trace('error retrieving bucket metadata',
|
||||||
query: request.query,
|
{ error: err });
|
||||||
generalResource: request.bucketName,
|
return next(err);
|
||||||
requesterIp: ip,
|
}
|
||||||
sslEnabled: request.connection.encrypted,
|
activeSpan.addEvent('Retrieved Bucket Metadata');
|
||||||
apiMethod: 'objectDelete',
|
// check whether bucket has transient or deleted flag
|
||||||
awsService: 's3',
|
if (bucketShield(bucketMD, 'objectDelete')) {
|
||||||
locationConstraint: null,
|
return next(errors.NoSuchBucket);
|
||||||
requesterInfo: authInfo,
|
}
|
||||||
signatureVersion: authParams.params.data.authType,
|
activeSpan.addEvent('Checking Bucket Authorization');
|
||||||
authType: authParams.params.data.signatureVersion,
|
if (!isBucketAuthorized(bucketMD, 'objectDelete', canonicalID, authInfo, log, request,
|
||||||
signatureAge: authParams.params.data.signatureAge,
|
request.actionImplicitDenies)) {
|
||||||
},
|
log.trace("access denied due to bucket acl's");
|
||||||
parameterize: {
|
// if access denied at the bucket level, no access for
|
||||||
// eslint-disable-next-line
|
// any of the objects so all results will be error results
|
||||||
specificResource: objects.map(entry => {
|
objects.forEach(entry => {
|
||||||
return {
|
errorResults.push({
|
||||||
key: entry.key,
|
entry,
|
||||||
versionId: entry.versionId,
|
error: errors.AccessDenied,
|
||||||
};
|
});
|
||||||
}),
|
});
|
||||||
},
|
// by sending an empty array as the objects array
|
||||||
};
|
// async.forEachLimit below will not actually
|
||||||
return vault.checkPolicies(requestContextParams, authInfo.getArn(),
|
// make any calls to metadata or data but will continue on
|
||||||
log, (err, authorizationResults) => {
|
// to the next step to build xml
|
||||||
// there were no policies so received a blanket AccessDenied
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
if (err && err.is.AccessDenied) {
|
currentSpan.end();
|
||||||
objects.forEach(entry => {
|
return next(null, quietSetting, errorResults, [], bucketMD);
|
||||||
errorResults.push({
|
}
|
||||||
entry,
|
activeSpan.addEvent('User is authorized to perform object delete on the bucket for all resources');
|
||||||
error: errors.AccessDenied });
|
currentSpan.end();
|
||||||
});
|
return next(null, quietSetting, errorResults, objects, bucketMD);
|
||||||
// send empty array for inPlay
|
});
|
||||||
return next(null, quietSetting, errorResults, [], bucketMD);
|
});
|
||||||
}
|
},
|
||||||
if (err) {
|
function checkPolicies(quietSetting, errorResults, objects, bucketMD, next) {
|
||||||
log.trace('error checking policies', {
|
return tracer.startActiveSpan('Starting Vault Authorization for each object', undefined, multiObjectDeleteSpanContext, currentSpan => {
|
||||||
error: err,
|
const currentSpanContext = opentelemetry.trace.setSpan(
|
||||||
method: 'multiObjectDelete.checkPolicies',
|
multiObjectDeleteSpanContext,
|
||||||
});
|
currentSpan,
|
||||||
return next(err);
|
);
|
||||||
}
|
activeSpan.addEvent('Preparing reqeust context for all objects');
|
||||||
if (objects.length !== authorizationResults.length) {
|
// track keys that are still on track to be deleted
|
||||||
log.error('vault did not return correct number of ' +
|
const inPlay = [];
|
||||||
'authorization results', {
|
// if request from account, no need to check policies
|
||||||
authorizationResultsLength:
|
// all objects are inPlay so send array of object keys
|
||||||
authorizationResults.length,
|
// as inPlay argument
|
||||||
objectsLength: objects.length,
|
if (!authInfo.isRequesterAnIAMUser()) {
|
||||||
});
|
activeSpan.addEvent('Account user no policies need to be checked');
|
||||||
return next(errors.InternalError);
|
currentSpan.end();
|
||||||
}
|
return next(null, quietSetting, errorResults, objects, bucketMD);
|
||||||
// Convert authorization results into an easier to handle format
|
}
|
||||||
const actionImplicitDenies = authorizationResults.reduce((acc, curr, idx) => {
|
|
||||||
const apiMethod = authorizationResults[idx].action;
|
// TODO: once arsenal's extractParams is separated from doAuth
|
||||||
// eslint-disable-next-line no-param-reassign
|
// function, refactor so only extract once and send
|
||||||
acc[apiMethod] = curr.isImplicit;
|
// params on to this api
|
||||||
return acc;
|
const authParams = auth.server.extractParams(request, log,
|
||||||
}, {});
|
's3', request.query, { activeSpan, activeTracerContext: currentSpanContext, tracer });
|
||||||
for (let i = 0; i < authorizationResults.length; i++) {
|
const ip = requestUtils.getClientIp(request, config);
|
||||||
const result = authorizationResults[i];
|
const requestContextParams = {
|
||||||
// result is { isAllowed: true,
|
constantParams: {
|
||||||
// arn: arn:aws:s3:::bucket/object,
|
headers: request.headers,
|
||||||
// versionId: sampleversionId } unless not allowed
|
query: request.query,
|
||||||
// in which case no isAllowed key will be present
|
generalResource: request.bucketName,
|
||||||
const slashIndex = result.arn.indexOf('/');
|
requesterIp: ip,
|
||||||
if (slashIndex === -1) {
|
sslEnabled: request.connection.encrypted,
|
||||||
log.error('wrong arn format from vault');
|
apiMethod: 'objectDelete',
|
||||||
return next(errors.InternalError);
|
awsService: 's3',
|
||||||
}
|
locationConstraint: null,
|
||||||
const entry = {
|
requesterInfo: authInfo,
|
||||||
key: result.arn.slice(slashIndex + 1),
|
signatureVersion: authParams.params.data.authType,
|
||||||
versionId: result.versionId,
|
authType: authParams.params.data.signatureVersion,
|
||||||
};
|
signatureAge: authParams.params.data.signatureAge,
|
||||||
// Deny immediately if there is an explicit deny
|
},
|
||||||
if (!result.isImplicit && !result.isAllowed) {
|
parameterize: {
|
||||||
errorResults.push({
|
// eslint-disable-next-line
|
||||||
entry,
|
specificResource: objects.map(entry => {
|
||||||
error: errors.AccessDenied,
|
return {
|
||||||
});
|
key: entry.key,
|
||||||
continue;
|
versionId: entry.versionId,
|
||||||
}
|
};
|
||||||
|
}),
|
||||||
// Evaluate against the bucket policies
|
},
|
||||||
const areAllActionsAllowed = evaluateBucketPolicyWithIAM(
|
};
|
||||||
bucketMD,
|
activeSpan.addEvent('Request Context prepared for all objects');
|
||||||
Object.keys(actionImplicitDenies),
|
activeSpan.addEvent('Starting Vault Authorization for all objects');
|
||||||
canonicalID,
|
return vault.checkPolicies(requestContextParams, authInfo.getArn(),
|
||||||
authInfo,
|
log, (err, authorizationResults) => {
|
||||||
actionImplicitDenies,
|
activeSpan.addEvent('Vault Authorization completed for all objects');
|
||||||
log,
|
// there were no policies so received a blanket AccessDenied
|
||||||
request);
|
if (err && err.is.AccessDenied) {
|
||||||
|
objects.forEach(entry => {
|
||||||
if (areAllActionsAllowed) {
|
errorResults.push({
|
||||||
inPlay.push(entry);
|
entry,
|
||||||
} else {
|
error: errors.AccessDenied });
|
||||||
errorResults.push({
|
});
|
||||||
entry,
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
error: errors.AccessDenied,
|
currentSpan.end();
|
||||||
});
|
// send empty array for inPlay
|
||||||
}
|
return next(null, quietSetting, errorResults, [], bucketMD);
|
||||||
}
|
}
|
||||||
return next(null, quietSetting, errorResults, inPlay, bucketMD);
|
if (err) {
|
||||||
});
|
log.trace('error checking policies', {
|
||||||
},
|
error: err,
|
||||||
function getObjMetadataAndDeleteStep(quietSetting, errorResults, inPlay,
|
method: 'multiObjectDelete.checkPolicies',
|
||||||
bucket, next) {
|
});
|
||||||
return getObjMetadataAndDelete(authInfo, canonicalID, request,
|
activeSpan.recordException(err);
|
||||||
bucketName, bucket, quietSetting, errorResults, inPlay,
|
currentSpan.end();
|
||||||
log, next);
|
multiObjectDeleteSpan.end();
|
||||||
},
|
return next(err);
|
||||||
], (err, quietSetting, errorResults, numOfObjectsRemoved,
|
}
|
||||||
successfullyDeleted, totalContentLengthDeleted, bucket) => {
|
if (objects.length !== authorizationResults.length) {
|
||||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
log.error('vault did not return correct number of ' +
|
||||||
request.method, bucket);
|
'authorization results', {
|
||||||
if (err) {
|
authorizationResultsLength:
|
||||||
monitoring.promMetrics('DELETE', bucketName, err.code,
|
authorizationResults.length,
|
||||||
'multiObjectDelete');
|
objectsLength: objects.length,
|
||||||
return callback(err, null, corsHeaders);
|
});
|
||||||
}
|
activeSpan.recordException(errors.InternalError);
|
||||||
const xml = _formatXML(quietSetting, errorResults,
|
currentSpan.end();
|
||||||
successfullyDeleted);
|
multiObjectDeleteSpan.end();
|
||||||
const deletedKeys = successfullyDeleted.map(item => item.key);
|
return next(errors.InternalError);
|
||||||
const removedDeleteMarkers = successfullyDeleted
|
}
|
||||||
.filter(item => item.isDeleteMarker && item.entry && item.entry.versionId)
|
// Convert authorization results into an easier to handle format
|
||||||
.length;
|
const actionImplicitDenies = authorizationResults.reduce((acc, curr, idx) => {
|
||||||
pushMetric('multiObjectDelete', log, {
|
const apiMethod = authorizationResults[idx].action;
|
||||||
authInfo,
|
// eslint-disable-next-line no-param-reassign
|
||||||
canonicalID: bucket ? bucket.getOwner() : '',
|
acc[apiMethod] = curr.isImplicit;
|
||||||
bucket: bucketName,
|
return acc;
|
||||||
keys: deletedKeys,
|
}, {});
|
||||||
byteLength: Number.parseInt(totalContentLengthDeleted, 10),
|
activeSpan.addEvent('Checking authorization results for all objects')
|
||||||
numberOfObjects: numOfObjectsRemoved,
|
for (let i = 0; i < authorizationResults.length; i++) {
|
||||||
removedDeleteMarkers,
|
const result = authorizationResults[i];
|
||||||
isDelete: true,
|
// result is { isAllowed: true,
|
||||||
|
// arn: arn:aws:s3:::bucket/object,
|
||||||
|
// versionId: sampleversionId } unless not allowed
|
||||||
|
// in which case no isAllowed key will be present
|
||||||
|
const slashIndex = result.arn.indexOf('/');
|
||||||
|
if (slashIndex === -1) {
|
||||||
|
log.error('wrong arn format from vault');
|
||||||
|
return next(errors.InternalError);
|
||||||
|
}
|
||||||
|
const entry = {
|
||||||
|
key: result.arn.slice(slashIndex + 1),
|
||||||
|
versionId: result.versionId,
|
||||||
|
};
|
||||||
|
// Deny immediately if there is an explicit deny
|
||||||
|
if (!result.isImplicit && !result.isAllowed) {
|
||||||
|
errorResults.push({
|
||||||
|
entry,
|
||||||
|
error: errors.AccessDenied,
|
||||||
|
});
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Evaluate against the bucket policies
|
||||||
|
const areAllActionsAllowed = evaluateBucketPolicyWithIAM(
|
||||||
|
bucketMD,
|
||||||
|
Object.keys(actionImplicitDenies),
|
||||||
|
canonicalID,
|
||||||
|
authInfo,
|
||||||
|
actionImplicitDenies,
|
||||||
|
log,
|
||||||
|
request);
|
||||||
|
|
||||||
|
if (areAllActionsAllowed) {
|
||||||
|
inPlay.push(entry);
|
||||||
|
} else {
|
||||||
|
errorResults.push({
|
||||||
|
entry,
|
||||||
|
error: errors.AccessDenied,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('Authorization results checked for all objects');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, quietSetting, errorResults, inPlay, bucketMD);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
},
|
||||||
|
function getObjMetadataAndDeleteStep(quietSetting, errorResults, inPlay,
|
||||||
|
bucket, next) {
|
||||||
|
return tracer.startActiveSpan('Getting Object Metadata and Deleting Objects', undefined, multiObjectDeleteSpanContext, currentSpan => {
|
||||||
|
const currentSpanContext = opentelemetry.trace.setSpan(
|
||||||
|
multiObjectDeleteSpanContext,
|
||||||
|
currentSpan,
|
||||||
|
);
|
||||||
|
return getObjMetadataAndDelete(authInfo, canonicalID, request,
|
||||||
|
bucketName, bucket, quietSetting, errorResults, inPlay,
|
||||||
|
log, next, { currentSpan, activeSpan, activeTracerContext: currentSpanContext, tracer });
|
||||||
|
});
|
||||||
|
},
|
||||||
|
], (err, quietSetting, errorResults, numOfObjectsRemoved,
|
||||||
|
successfullyDeleted, totalContentLengthDeleted, bucket) => {
|
||||||
|
activeSpan.addEvent('Processing Delete Objects (multi) Request; objects deleted');
|
||||||
|
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
|
request.method, bucket);
|
||||||
|
if (err) {
|
||||||
|
monitoring.promMetrics('DELETE', bucketName, err.code,
|
||||||
|
'multiObjectDelete');
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
multiObjectDeleteSpan.end();
|
||||||
|
return callback(err, null, corsHeaders);
|
||||||
|
}
|
||||||
|
const xml = _formatXML(quietSetting, errorResults,
|
||||||
|
successfullyDeleted);
|
||||||
|
const deletedKeys = successfullyDeleted.map(item => item.key);
|
||||||
|
const removedDeleteMarkers = successfullyDeleted
|
||||||
|
.filter(item => item.isDeleteMarker && item.entry && item.entry.versionId)
|
||||||
|
.length;
|
||||||
|
pushMetric('multiObjectDelete', log, {
|
||||||
|
authInfo,
|
||||||
|
canonicalID: bucket ? bucket.getOwner() : '',
|
||||||
|
bucket: bucketName,
|
||||||
|
keys: deletedKeys,
|
||||||
|
byteLength: Number.parseInt(totalContentLengthDeleted, 10),
|
||||||
|
numberOfObjects: numOfObjectsRemoved,
|
||||||
|
removedDeleteMarkers,
|
||||||
|
isDelete: true,
|
||||||
|
});
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.objects.size': totalContentLengthDeleted,
|
||||||
|
'aws.s3.objects.deleted.count': numOfObjectsRemoved,
|
||||||
|
'aws.s3.object.size': totalContentLengthDeleted / numOfObjectsRemoved,
|
||||||
|
});
|
||||||
|
monitoring.promMetrics('DELETE', bucketName, '200',
|
||||||
|
'multiObjectDelete',
|
||||||
|
Number.parseInt(totalContentLengthDeleted, 10), null, null,
|
||||||
|
numOfObjectsRemoved);
|
||||||
|
activeSpan.addEvent('Returning Response');
|
||||||
|
multiObjectDeleteSpan.end();
|
||||||
|
// cloudserverApiSpan.end();
|
||||||
|
return callback(null, xml, corsHeaders);
|
||||||
});
|
});
|
||||||
monitoring.promMetrics('DELETE', bucketName, '200',
|
|
||||||
'multiObjectDelete',
|
|
||||||
Number.parseInt(totalContentLengthDeleted, 10), null, null,
|
|
||||||
numOfObjectsRemoved);
|
|
||||||
return callback(null, xml, corsHeaders);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -202,7 +202,7 @@ function _prepMetadata(request, sourceObjMD, headers, sourceIsDestination,
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectCopy(authInfo, request, sourceBucket,
|
function objectCopy(authInfo, request, sourceBucket,
|
||||||
sourceObject, sourceVersionId, log, callback) {
|
sourceObject, sourceVersionId, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectCopy' });
|
log.debug('processing request', { method: 'objectCopy' });
|
||||||
const destBucketName = request.bucketName;
|
const destBucketName = request.bucketName;
|
||||||
const destObjectKey = request.objectKey;
|
const destObjectKey = request.objectKey;
|
||||||
|
@ -263,7 +263,7 @@ function objectCopy(authInfo, request, sourceBucket,
|
||||||
return next(errors.NoSuchBucket);
|
return next(errors.NoSuchBucket);
|
||||||
}
|
}
|
||||||
return next(null, destBucketMD, destObjMD);
|
return next(null, destBucketMD, destObjMD);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function checkSourceAuthorization(destBucketMD, destObjMD, next) {
|
function checkSourceAuthorization(destBucketMD, destObjMD, next) {
|
||||||
return standardMetadataValidateBucketAndObj(valGetParams, request.actionImplicitDenies, log,
|
return standardMetadataValidateBucketAndObj(valGetParams, request.actionImplicitDenies, log,
|
||||||
|
@ -340,7 +340,7 @@ function objectCopy(authInfo, request, sourceBucket,
|
||||||
return next(null, storeMetadataParams, dataLocator,
|
return next(null, storeMetadataParams, dataLocator,
|
||||||
sourceBucketMD, destBucketMD, destObjMD,
|
sourceBucketMD, destBucketMD, destObjMD,
|
||||||
sourceLocationConstraintName, backendInfoDest);
|
sourceLocationConstraintName, backendInfoDest);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function getSSEConfiguration(storeMetadataParams, dataLocator, sourceBucketMD,
|
function getSSEConfiguration(storeMetadataParams, dataLocator, sourceBucketMD,
|
||||||
destBucketMD, destObjMD, sourceLocationConstraintName,
|
destBucketMD, destObjMD, sourceLocationConstraintName,
|
||||||
|
@ -487,7 +487,7 @@ function objectCopy(authInfo, request, sourceBucket,
|
||||||
return next(null, dataToDelete, result, destBucketMD,
|
return next(null, dataToDelete, result, destBucketMD,
|
||||||
storeMetadataParams, serverSideEncryption,
|
storeMetadataParams, serverSideEncryption,
|
||||||
sourceObjSize, destObjPrevSize);
|
sourceObjSize, destObjPrevSize);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function deleteExistingData(dataToDelete, storingNewMdResult,
|
function deleteExistingData(dataToDelete, storingNewMdResult,
|
||||||
destBucketMD, storeMetadataParams, serverSideEncryption,
|
destBucketMD, storeMetadataParams, serverSideEncryption,
|
||||||
|
|
|
@ -26,9 +26,10 @@ const { overheadField } = require('../../constants');
|
||||||
* includes normalized headers
|
* includes normalized headers
|
||||||
* @param {Logger} log - werelogs request instance
|
* @param {Logger} log - werelogs request instance
|
||||||
* @param {function} cb - final cb to call with the result and response headers
|
* @param {function} cb - final cb to call with the result and response headers
|
||||||
|
* @param {object} oTel - tracing information
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectDelete(authInfo, request, log, cb) {
|
function objectDelete(authInfo, request, log, cb, oTel) {
|
||||||
log.debug('processing request', { method: 'objectDelete' });
|
log.debug('processing request', { method: 'objectDelete' });
|
||||||
if (authInfo.isRequesterPublicUser()) {
|
if (authInfo.isRequesterPublicUser()) {
|
||||||
log.debug('operation not available for public user');
|
log.debug('operation not available for public user');
|
||||||
|
@ -97,7 +98,7 @@ function objectDelete(authInfo, request, log, cb) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return next(null, bucketMD, objMD);
|
return next(null, bucketMD, objMD);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function checkGovernanceBypassHeader(bucketMD, objectMD, next) {
|
function checkGovernanceBypassHeader(bucketMD, objectMD, next) {
|
||||||
// AWS only returns an object lock error if a version id
|
// AWS only returns an object lock error if a version id
|
||||||
|
@ -113,7 +114,7 @@ function objectDelete(authInfo, request, log, cb) {
|
||||||
return next(err, bucketMD);
|
return next(err, bucketMD);
|
||||||
}
|
}
|
||||||
return next(null, hasGovernanceBypass, bucketMD, objectMD);
|
return next(null, hasGovernanceBypass, bucketMD, objectMD);
|
||||||
});
|
}, oTel);
|
||||||
}
|
}
|
||||||
return next(null, hasGovernanceBypass, bucketMD, objectMD);
|
return next(null, hasGovernanceBypass, bucketMD, objectMD);
|
||||||
},
|
},
|
||||||
|
@ -172,7 +173,7 @@ function objectDelete(authInfo, request, log, cb) {
|
||||||
objectKey, objectMD, authInfo, canonicalID, null, request,
|
objectKey, objectMD, authInfo, canonicalID, null, request,
|
||||||
deleteInfo.newDeleteMarker, null, overheadField, log, (err, newDelMarkerRes) => {
|
deleteInfo.newDeleteMarker, null, overheadField, log, (err, newDelMarkerRes) => {
|
||||||
next(err, bucketMD, objectMD, newDelMarkerRes, deleteInfo);
|
next(err, bucketMD, objectMD, newDelMarkerRes, deleteInfo);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
], (err, bucketMD, objectMD, result, deleteInfo) => {
|
], (err, bucketMD, objectMD, result, deleteInfo) => {
|
||||||
const resHeaders = collectCorsHeaders(request.headers.origin,
|
const resHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
|
@ -254,6 +255,11 @@ function objectDelete(authInfo, request, log, cb) {
|
||||||
location: objectMD.dataStoreName,
|
location: objectMD.dataStoreName,
|
||||||
isDelete: true,
|
isDelete: true,
|
||||||
});
|
});
|
||||||
|
const { activeSpan } = oTel;
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.object.size': objectMD['content-length'],
|
||||||
|
'aws.s3.region': objectMD.dataStoreName,
|
||||||
|
});
|
||||||
monitoring.promMetrics('DELETE', bucketName, '200', 'deleteObject',
|
monitoring.promMetrics('DELETE', bucketName, '200', 'deleteObject',
|
||||||
Number.parseInt(objectMD['content-length'], 10));
|
Number.parseInt(objectMD['content-length'], 10));
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ const REPLICATION_ACTION = 'DELETE_TAGGING';
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectDeleteTagging(authInfo, request, log, callback) {
|
function objectDeleteTagging(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectDeleteTagging' });
|
log.debug('processing request', { method: 'objectDeleteTagging' });
|
||||||
|
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
|
@ -71,7 +71,7 @@ function objectDeleteTagging(authInfo, request, log, callback) {
|
||||||
return next(errors.MethodNotAllowed, bucket);
|
return next(errors.MethodNotAllowed, bucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, objectMD, next) => {
|
(bucket, objectMD, next) => {
|
||||||
// eslint-disable-next-line no-param-reassign
|
// eslint-disable-next-line no-param-reassign
|
||||||
objectMD.tags = {};
|
objectMD.tags = {};
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const { errors, s3middleware } = require('arsenal');
|
const { errors, s3middleware } = require('arsenal');
|
||||||
const { parseRange } = require('arsenal').network.http.utils;
|
const { parseRange } = require('arsenal').network.http.utils;
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const { data } = require('../data/wrapper');
|
const { data } = require('../data/wrapper');
|
||||||
|
|
||||||
|
@ -25,236 +26,301 @@ const validateHeaders = s3middleware.validateConditionalHeaders;
|
||||||
* @param {function} callback - callback to function in route
|
* @param {function} callback - callback to function in route
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectGet(authInfo, request, returnTagCount, log, callback) {
|
function objectGet(authInfo, request, returnTagCount, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectGet' });
|
const {
|
||||||
const bucketName = request.bucketName;
|
cloudserverApiSpan,
|
||||||
const objectKey = request.objectKey;
|
activeSpan,
|
||||||
|
activeTracerContext,
|
||||||
const decodedVidResult = decodeVersionId(request.query);
|
tracer,
|
||||||
if (decodedVidResult instanceof Error) {
|
} = oTel;
|
||||||
log.trace('invalid versionId query', {
|
activeSpan.addEvent('Entered objectGet() Function');
|
||||||
versionId: request.query.versionId,
|
const cloudserverApiSpanContext = opentelemetry.trace.setSpan(
|
||||||
error: decodedVidResult,
|
activeTracerContext,
|
||||||
|
cloudserverApiSpan,
|
||||||
|
);
|
||||||
|
return tracer.startActiveSpan('Getting Object MD from S3', undefined, cloudserverApiSpanContext, objectGetSpan => {
|
||||||
|
const objectGetSpanContext = opentelemetry.trace.setSpan(
|
||||||
|
activeTracerContext,
|
||||||
|
objectGetSpan,
|
||||||
|
);
|
||||||
|
objectGetSpan.setAttributes({
|
||||||
|
'code.function': 'objectGet()',
|
||||||
|
'code.filename': 'lib/api/objectGet.js',
|
||||||
|
'code.lineno': 29,
|
||||||
});
|
});
|
||||||
return callback(decodedVidResult);
|
activeSpan.addEvent('Processing Object Get Request');
|
||||||
}
|
log.debug('processing request', { method: 'objectGet' });
|
||||||
const versionId = decodedVidResult;
|
const bucketName = request.bucketName;
|
||||||
|
const objectKey = request.objectKey;
|
||||||
|
|
||||||
const mdValParams = {
|
const decodedVidResult = decodeVersionId(request.query);
|
||||||
authInfo,
|
if (decodedVidResult instanceof Error) {
|
||||||
bucketName,
|
activeSpan.recordException(decodedVidResult);
|
||||||
objectKey,
|
objectGetSpan.end();
|
||||||
versionId,
|
log.trace('invalid versionId query', {
|
||||||
getDeleteMarker: true,
|
versionId: request.query.versionId,
|
||||||
requestType: request.apiMethods || 'objectGet',
|
error: decodedVidResult,
|
||||||
request,
|
|
||||||
};
|
|
||||||
|
|
||||||
return standardMetadataValidateBucketAndObj(mdValParams, request.actionImplicitDenies, log,
|
|
||||||
(err, bucket, objMD) => {
|
|
||||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
|
||||||
request.method, bucket);
|
|
||||||
if (err) {
|
|
||||||
log.debug('error processing request', {
|
|
||||||
error: err,
|
|
||||||
method: 'metadataValidateBucketAndObj',
|
|
||||||
});
|
});
|
||||||
monitoring.promMetrics(
|
return callback(decodedVidResult);
|
||||||
'GET', bucketName, err.code, 'getObject');
|
|
||||||
return callback(err, null, corsHeaders);
|
|
||||||
}
|
}
|
||||||
if (!objMD) {
|
const versionId = decodedVidResult;
|
||||||
const err = versionId ? errors.NoSuchVersion : errors.NoSuchKey;
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, err.code, 'getObject');
|
|
||||||
return callback(err, null, corsHeaders);
|
|
||||||
}
|
|
||||||
const verCfg = bucket.getVersioningConfiguration();
|
|
||||||
if (objMD.isDeleteMarker) {
|
|
||||||
const responseMetaHeaders = Object.assign({},
|
|
||||||
{ 'x-amz-delete-marker': true }, corsHeaders);
|
|
||||||
if (!versionId) {
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 404, 'getObject');
|
|
||||||
return callback(errors.NoSuchKey, null, responseMetaHeaders);
|
|
||||||
}
|
|
||||||
// return MethodNotAllowed if requesting a specific
|
|
||||||
// version that has a delete marker
|
|
||||||
responseMetaHeaders['x-amz-version-id'] =
|
|
||||||
getVersionIdResHeader(verCfg, objMD);
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 405, 'getObject');
|
|
||||||
return callback(errors.MethodNotAllowed, null,
|
|
||||||
responseMetaHeaders);
|
|
||||||
}
|
|
||||||
const headerValResult = validateHeaders(request.headers,
|
|
||||||
objMD['last-modified'], objMD['content-md5']);
|
|
||||||
if (headerValResult.error) {
|
|
||||||
return callback(headerValResult.error, null, corsHeaders);
|
|
||||||
}
|
|
||||||
const responseMetaHeaders = collectResponseHeaders(objMD,
|
|
||||||
corsHeaders, verCfg, returnTagCount);
|
|
||||||
|
|
||||||
setExpirationHeaders(responseMetaHeaders, {
|
const mdValParams = {
|
||||||
lifecycleConfig: bucket.getLifecycleConfiguration(),
|
authInfo,
|
||||||
objectParams: {
|
bucketName,
|
||||||
key: objectKey,
|
objectKey,
|
||||||
tags: objMD.tags,
|
versionId,
|
||||||
date: objMD['last-modified'],
|
getDeleteMarker: true,
|
||||||
},
|
requestType: request.apiMethods || 'objectGet',
|
||||||
isVersionedReq: !!versionId,
|
request,
|
||||||
});
|
};
|
||||||
|
const mdSpan = tracer.startSpan('Validating Bucket and Object Metadata', undefined, objectGetSpanContext);
|
||||||
const objLength = (objMD.location === null ?
|
const mdSpanContext = opentelemetry.trace.setSpan(
|
||||||
0 : parseInt(objMD['content-length'], 10));
|
objectGetSpanContext,
|
||||||
let byteRange;
|
mdSpan,
|
||||||
const streamingParams = {};
|
);
|
||||||
if (request.headers.range) {
|
return standardMetadataValidateBucketAndObj(mdValParams, request.actionImplicitDenies, log,
|
||||||
const { range, error } = parseRange(request.headers.range,
|
(err, bucket, objMD) => {
|
||||||
objLength);
|
mdSpan.end();
|
||||||
if (error) {
|
activeSpan.addEvent('Bucket and Object Metadata Validation Complete');
|
||||||
monitoring.promMetrics(
|
activeSpan.addEvent('Collecting CORS Headers');
|
||||||
'GET', bucketName, 400, 'getObject');
|
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||||
return callback(error, null, corsHeaders);
|
request.method, bucket);
|
||||||
}
|
activeSpan.addEvent('CORS Headers Collected');
|
||||||
responseMetaHeaders['Accept-Ranges'] = 'bytes';
|
|
||||||
if (range) {
|
|
||||||
byteRange = range;
|
|
||||||
// End of range should be included so + 1
|
|
||||||
responseMetaHeaders['Content-Length'] =
|
|
||||||
range[1] - range[0] + 1;
|
|
||||||
responseMetaHeaders['Content-Range'] =
|
|
||||||
`bytes ${range[0]}-${range[1]}/${objLength}`;
|
|
||||||
streamingParams.rangeStart = (range[0] || typeof range[0] === 'number') ?
|
|
||||||
range[0].toString() : undefined;
|
|
||||||
streamingParams.rangeEnd = range[1] ?
|
|
||||||
range[1].toString() : undefined;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let dataLocator = null;
|
|
||||||
if (objMD.location !== null) {
|
|
||||||
// To provide for backwards compatibility before
|
|
||||||
// md-model-version 2, need to handle cases where
|
|
||||||
// objMD.location is just a string
|
|
||||||
dataLocator = Array.isArray(objMD.location) ?
|
|
||||||
objMD.location : [{ key: objMD.location }];
|
|
||||||
// if the data backend is azure, there will only ever be at
|
|
||||||
// most one item in the dataLocator array
|
|
||||||
if (dataLocator[0] && dataLocator[0].dataStoreType === 'azure') {
|
|
||||||
dataLocator[0].azureStreamingOptions = streamingParams;
|
|
||||||
}
|
|
||||||
|
|
||||||
let partNumber = null;
|
|
||||||
if (request.query && request.query.partNumber !== undefined) {
|
|
||||||
if (byteRange) {
|
|
||||||
const error = errors.InvalidRequest
|
|
||||||
.customizeDescription('Cannot specify both Range ' +
|
|
||||||
'header and partNumber query parameter.');
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 400, 'getObject');
|
|
||||||
return callback(error, null, corsHeaders);
|
|
||||||
}
|
|
||||||
partNumber = Number.parseInt(request.query.partNumber, 10);
|
|
||||||
if (Number.isNaN(partNumber)) {
|
|
||||||
const error = errors.InvalidArgument
|
|
||||||
.customizeDescription('Part number must be a number.');
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 400, 'getObject');
|
|
||||||
return callback(error, null, corsHeaders);
|
|
||||||
}
|
|
||||||
if (partNumber < 1 || partNumber > 10000) {
|
|
||||||
const error = errors.InvalidArgument
|
|
||||||
.customizeDescription('Part number must be an ' +
|
|
||||||
'integer between 1 and 10000, inclusive.');
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 400, 'getObject');
|
|
||||||
return callback(error, null, corsHeaders);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// If have a data model before version 2, cannot support
|
|
||||||
// get range for objects with multiple parts
|
|
||||||
if (byteRange && dataLocator.length > 1 &&
|
|
||||||
dataLocator[0].start === undefined) {
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 501, 'getObject');
|
|
||||||
return callback(errors.NotImplemented, null, corsHeaders);
|
|
||||||
}
|
|
||||||
if (objMD['x-amz-server-side-encryption']) {
|
|
||||||
for (let i = 0; i < dataLocator.length; i++) {
|
|
||||||
dataLocator[i].masterKeyId =
|
|
||||||
objMD['x-amz-server-side-encryption-aws-kms-key-id'];
|
|
||||||
dataLocator[i].algorithm =
|
|
||||||
objMD['x-amz-server-side-encryption'];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (partNumber) {
|
|
||||||
const locations = [];
|
|
||||||
let locationPartNumber;
|
|
||||||
for (let i = 0; i < objMD.location.length; i++) {
|
|
||||||
const { dataStoreETag } = objMD.location[i];
|
|
||||||
|
|
||||||
if (dataStoreETag) {
|
|
||||||
locationPartNumber =
|
|
||||||
Number.parseInt(dataStoreETag.split(':')[0], 10);
|
|
||||||
} else {
|
|
||||||
/**
|
|
||||||
* Location objects prior to GA7.1 do not include the
|
|
||||||
* dataStoreETag field so we cannot find the part range,
|
|
||||||
* the objects are treated as if they only have 1 part
|
|
||||||
*/
|
|
||||||
locationPartNumber = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get all parts that belong to the requested part number
|
|
||||||
if (partNumber === locationPartNumber) {
|
|
||||||
locations.push(objMD.location[i]);
|
|
||||||
} else if (locationPartNumber > partNumber) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (locations.length === 0) {
|
|
||||||
monitoring.promMetrics(
|
|
||||||
'GET', bucketName, 400, 'getObject');
|
|
||||||
return callback(errors.InvalidPartNumber, null,
|
|
||||||
corsHeaders);
|
|
||||||
}
|
|
||||||
const { start } = locations[0];
|
|
||||||
const endLocation = locations[locations.length - 1];
|
|
||||||
const end = endLocation.start + endLocation.size - 1;
|
|
||||||
responseMetaHeaders['Content-Length'] = end - start + 1;
|
|
||||||
const partByteRange = [start, end];
|
|
||||||
dataLocator = setPartRanges(dataLocator, partByteRange);
|
|
||||||
const partsCount = getPartCountFromMd5(objMD);
|
|
||||||
if (partsCount) {
|
|
||||||
responseMetaHeaders['x-amz-mp-parts-count'] =
|
|
||||||
partsCount;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
dataLocator = setPartRanges(dataLocator, byteRange);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return data.head(dataLocator, log, err => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
log.error('error from external backend checking for ' +
|
log.debug('error processing request', {
|
||||||
'object existence', { error: err });
|
error: err,
|
||||||
|
method: 'metadataValidateBucketAndObj',
|
||||||
|
});
|
||||||
monitoring.promMetrics(
|
monitoring.promMetrics(
|
||||||
'GET', bucketName, err.code, 'getObject');
|
'GET', bucketName, err.code, 'getObject');
|
||||||
return callback(err);
|
activeSpan.recordException(err);
|
||||||
|
objectGetSpan.end();
|
||||||
|
return callback(err, null, corsHeaders);
|
||||||
}
|
}
|
||||||
pushMetric('getObject', log, {
|
activeSpan.addEvent('Bucket Policy Validation Passed');
|
||||||
authInfo,
|
if (!objMD) {
|
||||||
bucket: bucketName,
|
const err = versionId ? errors.NoSuchVersion : errors.NoSuchKey;
|
||||||
keys: [objectKey],
|
monitoring.promMetrics(
|
||||||
newByteLength:
|
'GET', bucketName, err.code, 'getObject');
|
||||||
Number.parseInt(responseMetaHeaders['Content-Length'], 10),
|
activeSpan.recordException(errors.NoSuchBucket);
|
||||||
versionId: objMD.versionId,
|
objectGetSpan.end();
|
||||||
location: objMD.dataStoreName,
|
return callback(err, null, corsHeaders);
|
||||||
|
}
|
||||||
|
const verCfg = bucket.getVersioningConfiguration();
|
||||||
|
activeSpan.addEvent('Got Versioning Configuration');
|
||||||
|
if (objMD.isDeleteMarker) {
|
||||||
|
const responseMetaHeaders = Object.assign({},
|
||||||
|
{ 'x-amz-delete-marker': true }, corsHeaders);
|
||||||
|
if (!versionId) {
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 404, 'getObject');
|
||||||
|
activeSpan.recordException(errors.NoSuchKey);
|
||||||
|
objectGetSpan.end();
|
||||||
|
return callback(errors.NoSuchKey, null, responseMetaHeaders);
|
||||||
|
}
|
||||||
|
// return MethodNotAllowed if requesting a specific
|
||||||
|
// version that has a delete marker
|
||||||
|
responseMetaHeaders['x-amz-version-id'] =
|
||||||
|
getVersionIdResHeader(verCfg, objMD);
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 405, 'getObject');
|
||||||
|
activeSpan.recordException(errors.MethodNotAllowed);
|
||||||
|
objectGetSpan.end();
|
||||||
|
return callback(errors.MethodNotAllowed, null,
|
||||||
|
responseMetaHeaders);
|
||||||
|
}
|
||||||
|
const headerValResult = validateHeaders(request.headers,
|
||||||
|
objMD['last-modified'], objMD['content-md5']);
|
||||||
|
if (headerValResult.error) {
|
||||||
|
activeSpan.recordException(headerValResult.error);
|
||||||
|
objectGetSpan.end();
|
||||||
|
return callback(headerValResult.error, null, corsHeaders);
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('Validated Request Headers against last-modified date of object and/or ETag');
|
||||||
|
const responseMetaHeaders = collectResponseHeaders(objMD,
|
||||||
|
corsHeaders, verCfg, returnTagCount);
|
||||||
|
activeSpan.addEvent('Collected Response Headers from Object Metadata');
|
||||||
|
setExpirationHeaders(responseMetaHeaders, {
|
||||||
|
lifecycleConfig: bucket.getLifecycleConfiguration(),
|
||||||
|
objectParams: {
|
||||||
|
key: objectKey,
|
||||||
|
tags: objMD.tags,
|
||||||
|
date: objMD['last-modified'],
|
||||||
|
},
|
||||||
|
isVersionedReq: !!versionId,
|
||||||
});
|
});
|
||||||
monitoring.promMetrics('GET', bucketName, '200', 'getObject',
|
activeSpan.addEvent('Expiration Headers Set');
|
||||||
Number.parseInt(responseMetaHeaders['Content-Length'], 10));
|
const objLength = (objMD.location === null ?
|
||||||
return callback(null, dataLocator, responseMetaHeaders,
|
0 : parseInt(objMD['content-length'], 10));
|
||||||
byteRange);
|
let byteRange;
|
||||||
});
|
const streamingParams = {};
|
||||||
|
if (request.headers.range) {
|
||||||
|
const { range, error } = parseRange(request.headers.range,
|
||||||
|
objLength);
|
||||||
|
if (error) {
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 400, 'getObject');
|
||||||
|
activeSpan.recordException(error);
|
||||||
|
objectGetSpan.end();
|
||||||
|
return callback(error, null, corsHeaders);
|
||||||
|
}
|
||||||
|
responseMetaHeaders['Accept-Ranges'] = 'bytes';
|
||||||
|
if (range) {
|
||||||
|
byteRange = range;
|
||||||
|
// End of range should be included so + 1
|
||||||
|
responseMetaHeaders['Content-Length'] =
|
||||||
|
range[1] - range[0] + 1;
|
||||||
|
responseMetaHeaders['Content-Range'] =
|
||||||
|
`bytes ${range[0]}-${range[1]}/${objLength}`;
|
||||||
|
streamingParams.rangeStart = (range[0] || typeof range[0] === 'number') ?
|
||||||
|
range[0].toString() : undefined;
|
||||||
|
streamingParams.rangeEnd = range[1] ?
|
||||||
|
range[1].toString() : undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let dataLocator = null;
|
||||||
|
if (objMD.location !== null) {
|
||||||
|
// To provide for backwards compatibility before
|
||||||
|
// md-model-version 2, need to handle cases where
|
||||||
|
// objMD.location is just a string
|
||||||
|
dataLocator = Array.isArray(objMD.location) ?
|
||||||
|
objMD.location : [{ key: objMD.location }];
|
||||||
|
// if the data backend is azure, there will only ever be at
|
||||||
|
// most one item in the dataLocator array
|
||||||
|
if (dataLocator[0] && dataLocator[0].dataStoreType === 'azure') {
|
||||||
|
dataLocator[0].azureStreamingOptions = streamingParams;
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('Data Locator Set');
|
||||||
|
|
||||||
|
let partNumber = null;
|
||||||
|
if (request.query && request.query.partNumber !== undefined) {
|
||||||
|
if (byteRange) {
|
||||||
|
const error = errors.InvalidRequest
|
||||||
|
.customizeDescription('Cannot specify both Range ' +
|
||||||
|
'header and partNumber query parameter.');
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 400, 'getObject');
|
||||||
|
return callback(error, null, corsHeaders);
|
||||||
|
}
|
||||||
|
partNumber = Number.parseInt(request.query.partNumber, 10);
|
||||||
|
if (Number.isNaN(partNumber)) {
|
||||||
|
const error = errors.InvalidArgument
|
||||||
|
.customizeDescription('Part number must be a number.');
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 400, 'getObject');
|
||||||
|
return callback(error, null, corsHeaders);
|
||||||
|
}
|
||||||
|
if (partNumber < 1 || partNumber > 10000) {
|
||||||
|
const error = errors.InvalidArgument
|
||||||
|
.customizeDescription('Part number must be an ' +
|
||||||
|
'integer between 1 and 10000, inclusive.');
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 400, 'getObject');
|
||||||
|
return callback(error, null, corsHeaders);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If have a data model before version 2, cannot support
|
||||||
|
// get range for objects with multiple parts
|
||||||
|
if (byteRange && dataLocator.length > 1 &&
|
||||||
|
dataLocator[0].start === undefined) {
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 501, 'getObject');
|
||||||
|
return callback(errors.NotImplemented, null, corsHeaders);
|
||||||
|
}
|
||||||
|
if (objMD['x-amz-server-side-encryption']) {
|
||||||
|
for (let i = 0; i < dataLocator.length; i++) {
|
||||||
|
dataLocator[i].masterKeyId =
|
||||||
|
objMD['x-amz-server-side-encryption-aws-kms-key-id'];
|
||||||
|
dataLocator[i].algorithm =
|
||||||
|
objMD['x-amz-server-side-encryption'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('Retrieved Server-Side Encryption Configuration')
|
||||||
|
if (partNumber) {
|
||||||
|
const locations = [];
|
||||||
|
let locationPartNumber;
|
||||||
|
for (let i = 0; i < objMD.location.length; i++) {
|
||||||
|
const { dataStoreETag } = objMD.location[i];
|
||||||
|
|
||||||
|
if (dataStoreETag) {
|
||||||
|
locationPartNumber =
|
||||||
|
Number.parseInt(dataStoreETag.split(':')[0], 10);
|
||||||
|
} else {
|
||||||
|
/**
|
||||||
|
* Location objects prior to GA7.1 do not include the
|
||||||
|
* dataStoreETag field so we cannot find the part range,
|
||||||
|
* the objects are treated as if they only have 1 part
|
||||||
|
*/
|
||||||
|
locationPartNumber = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all parts that belong to the requested part number
|
||||||
|
if (partNumber === locationPartNumber) {
|
||||||
|
locations.push(objMD.location[i]);
|
||||||
|
} else if (locationPartNumber > partNumber) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (locations.length === 0) {
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, 400, 'getObject');
|
||||||
|
return callback(errors.InvalidPartNumber, null,
|
||||||
|
corsHeaders);
|
||||||
|
}
|
||||||
|
const { start } = locations[0];
|
||||||
|
const endLocation = locations[locations.length - 1];
|
||||||
|
const end = endLocation.start + endLocation.size - 1;
|
||||||
|
responseMetaHeaders['Content-Length'] = end - start + 1;
|
||||||
|
const partByteRange = [start, end];
|
||||||
|
dataLocator = setPartRanges(dataLocator, partByteRange);
|
||||||
|
const partsCount = getPartCountFromMd5(objMD);
|
||||||
|
if (partsCount) {
|
||||||
|
responseMetaHeaders['x-amz-mp-parts-count'] =
|
||||||
|
partsCount;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
dataLocator = setPartRanges(dataLocator, byteRange);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return tracer.startActiveSpan('Performing head request to locate data', undefined, objectGetSpanContext, dataLocatorSpan => {
|
||||||
|
dataLocatorSpan.setAttributes({
|
||||||
|
'code.function': 'objectGet()',
|
||||||
|
'code.filename': 'lib/api/objectGet.js',
|
||||||
|
'code.lineno': 289,
|
||||||
|
});
|
||||||
|
return data.head(dataLocator, log, err => {
|
||||||
|
dataLocatorSpan.end();
|
||||||
|
if (err) {
|
||||||
|
log.error('error from external backend checking for ' +
|
||||||
|
'object existence', { error: err });
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', bucketName, err.code, 'getObject');
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
pushMetric('getObject', log, {
|
||||||
|
authInfo,
|
||||||
|
bucket: bucketName,
|
||||||
|
keys: [objectKey],
|
||||||
|
newByteLength:
|
||||||
|
Number.parseInt(responseMetaHeaders['Content-Length'], 10),
|
||||||
|
versionId: objMD.versionId,
|
||||||
|
location: objMD.dataStoreName,
|
||||||
|
});
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.object.size': Number.parseInt(responseMetaHeaders['Content-Length'], 10),
|
||||||
|
'aws.s3.region': objMD.dataStoreName,
|
||||||
|
'aws.s3.object.versionId': objMD.versionId,
|
||||||
|
});
|
||||||
|
monitoring.promMetrics('GET', bucketName, '200', 'getObject',
|
||||||
|
Number.parseInt(responseMetaHeaders['Content-Length'], 10));
|
||||||
|
activeSpan.addEvent('Exiting objectGet() Function');
|
||||||
|
objectGetSpan.end();
|
||||||
|
return callback(null, dataLocator, responseMetaHeaders,
|
||||||
|
byteRange);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}, { activeSpan, activeTracerContext: mdSpanContext, tracer });
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ const monitoring = require('../utilities/metrics');
|
||||||
* @param {function} callback - callback to respond to http request
|
* @param {function} callback - callback to respond to http request
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectGetACL(authInfo, request, log, callback) {
|
function objectGetACL(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectGetACL' });
|
log.debug('processing request', { method: 'objectGetACL' });
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
const objectKey = request.objectKey;
|
const objectKey = request.objectKey;
|
||||||
|
@ -103,7 +103,7 @@ function objectGetACL(authInfo, request, log, callback) {
|
||||||
return next(errors.NoSuchKey);
|
return next(errors.NoSuchKey);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function gatherACLs(bucket, objectMD, next) {
|
function gatherACLs(bucket, objectMD, next) {
|
||||||
const verCfg = bucket.getVersioningConfiguration();
|
const verCfg = bucket.getVersioningConfiguration();
|
||||||
|
|
|
@ -18,7 +18,7 @@ const { convertToXml } = s3middleware.objectLegalHold;
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectGetLegalHold(authInfo, request, log, callback) {
|
function objectGetLegalHold(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectGetLegalHold' });
|
log.debug('processing request', { method: 'objectGetLegalHold' });
|
||||||
|
|
||||||
const { bucketName, objectKey, query } = request;
|
const { bucketName, objectKey, query } = request;
|
||||||
|
@ -80,7 +80,7 @@ function objectGetLegalHold(authInfo, request, log, callback) {
|
||||||
'Bucket is missing Object Lock Configuration'));
|
'Bucket is missing Object Lock Configuration'));
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, objectMD, next) => {
|
(bucket, objectMD, next) => {
|
||||||
const { legalHold } = objectMD;
|
const { legalHold } = objectMD;
|
||||||
const xml = convertToXml(legalHold);
|
const xml = convertToXml(legalHold);
|
||||||
|
|
|
@ -18,7 +18,7 @@ const { convertToXml } = s3middleware.retention;
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectGetRetention(authInfo, request, log, callback) {
|
function objectGetRetention(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectGetRetention' });
|
log.debug('processing request', { method: 'objectGetRetention' });
|
||||||
|
|
||||||
const { bucketName, objectKey } = request;
|
const { bucketName, objectKey } = request;
|
||||||
|
@ -80,7 +80,7 @@ function objectGetRetention(authInfo, request, log, callback) {
|
||||||
'Bucket is missing Object Lock Configuration'));
|
'Bucket is missing Object Lock Configuration'));
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, objectMD, next) => {
|
(bucket, objectMD, next) => {
|
||||||
const { retentionMode, retentionDate } = objectMD;
|
const { retentionMode, retentionDate } = objectMD;
|
||||||
if (!retentionMode || !retentionDate) {
|
if (!retentionMode || !retentionDate) {
|
||||||
|
|
|
@ -18,7 +18,7 @@ const monitoring = require('../utilities/metrics');
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectGetTagging(authInfo, request, log, callback) {
|
function objectGetTagging(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectGetTagging' });
|
log.debug('processing request', { method: 'objectGetTagging' });
|
||||||
|
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
|
@ -75,7 +75,7 @@ function objectGetTagging(authInfo, request, log, callback) {
|
||||||
return next(errors.NoSuchKey);
|
return next(errors.NoSuchKey);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, objectMD, next) => {
|
(bucket, objectMD, next) => {
|
||||||
const tags = objectMD.tags;
|
const tags = objectMD.tags;
|
||||||
const xml = convertToXml(tags);
|
const xml = convertToXml(tags);
|
||||||
|
|
|
@ -22,10 +22,11 @@ const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
|
||||||
* @param {object} request - normalized request object
|
* @param {object} request - normalized request object
|
||||||
* @param {object} log - Werelogs logger
|
* @param {object} log - Werelogs logger
|
||||||
* @param {function} callback - callback to function in route
|
* @param {function} callback - callback to function in route
|
||||||
|
* @param {object} oTel - OpenTelemetry activeSpan, context and Tracer object
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
function objectHead(authInfo, request, log, callback) {
|
function objectHead(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectHead' });
|
log.debug('processing request', { method: 'objectHead' });
|
||||||
const bucketName = request.bucketName;
|
const bucketName = request.bucketName;
|
||||||
const objectKey = request.objectKey;
|
const objectKey = request.objectKey;
|
||||||
|
@ -158,9 +159,14 @@ function objectHead(authInfo, request, log, callback) {
|
||||||
versionId: objMD ? objMD.versionId : undefined,
|
versionId: objMD ? objMD.versionId : undefined,
|
||||||
location: objMD ? objMD.dataStoreName : undefined,
|
location: objMD ? objMD.dataStoreName : undefined,
|
||||||
});
|
});
|
||||||
|
const { activeSpan } = oTel;
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
// 'aws.s3.action': 'objectHead',
|
||||||
|
'aws.s3.region': objMD ? objMD.dataStoreName : undefined,
|
||||||
|
});
|
||||||
monitoring.promMetrics('HEAD', bucketName, '200', 'headObject');
|
monitoring.promMetrics('HEAD', bucketName, '200', 'headObject');
|
||||||
return callback(null, responseHeaders);
|
return callback(null, responseHeaders);
|
||||||
});
|
}, oTel);
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = objectHead;
|
module.exports = objectHead;
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const { errors, versioning } = require('arsenal');
|
const { errors, versioning } = require('arsenal');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const aclUtils = require('../utilities/aclUtils');
|
const aclUtils = require('../utilities/aclUtils');
|
||||||
const { cleanUpBucket } = require('./apiUtils/bucket/bucketCreation');
|
const { cleanUpBucket } = require('./apiUtils/bucket/bucketCreation');
|
||||||
|
@ -39,171 +40,304 @@ const versionIdUtils = versioning.VersionID;
|
||||||
* (to be used for streaming v4 auth if applicable)
|
* (to be used for streaming v4 auth if applicable)
|
||||||
* @param {object} log - the log request
|
* @param {object} log - the log request
|
||||||
* @param {Function} callback - final callback to call with the result
|
* @param {Function} callback - final callback to call with the result
|
||||||
|
* @param {object} authorizationResults - authorization results from
|
||||||
|
* @param {object} oTel - OpenTelemetry methods
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
function objectPut(authInfo, request, streamingV4Params, log, callback, authorizationResults, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPut' });
|
|
||||||
const {
|
const {
|
||||||
bucketName,
|
cloudserverApiSpan,
|
||||||
headers,
|
activeSpan,
|
||||||
method,
|
activeTracerContext,
|
||||||
objectKey,
|
tracer,
|
||||||
parsedContentLength,
|
} = oTel;
|
||||||
query,
|
activeSpan.addEvent('Entered objectPut() Function');
|
||||||
} = request;
|
const cloudserverApiSpanContext = opentelemetry.trace.setSpan(
|
||||||
if (!aclUtils.checkGrantHeaderValidity(headers)) {
|
activeTracerContext,
|
||||||
log.trace('invalid acl header');
|
cloudserverApiSpan,
|
||||||
monitoring.promMetrics('PUT', request.bucketName, 400,
|
);
|
||||||
'putObject');
|
return tracer.startActiveSpan('Validating and Updating Object MD in S3', undefined, cloudserverApiSpanContext, objectPutSpan => {
|
||||||
return callback(errors.InvalidArgument);
|
const objectPutSpanContext = opentelemetry.trace.setSpan(
|
||||||
}
|
activeTracerContext,
|
||||||
const queryContainsVersionId = checkQueryVersionId(query);
|
objectPutSpan,
|
||||||
if (queryContainsVersionId instanceof Error) {
|
);
|
||||||
return callback(queryContainsVersionId);
|
objectPutSpan.setAttributes({
|
||||||
}
|
'code.function': 'objectPut()',
|
||||||
const invalidSSEError = errors.InvalidArgument.customizeDescription(
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
'The encryption method specified is not supported');
|
'code.lineno': 45,
|
||||||
const requestType = request.apiMethods || 'objectPut';
|
});
|
||||||
const valParams = { authInfo, bucketName, objectKey, requestType, request };
|
log.debug('processing request', { method: 'objectPut' });
|
||||||
const canonicalID = authInfo.getCanonicalID();
|
activeSpan.addEvent('Processing Object Put Request');
|
||||||
|
const {
|
||||||
if (hasNonPrintables(objectKey)) {
|
bucketName,
|
||||||
return callback(errors.InvalidInput.customizeDescription(
|
headers,
|
||||||
'object keys cannot contain non-printable characters',
|
method,
|
||||||
));
|
objectKey,
|
||||||
}
|
parsedContentLength,
|
||||||
|
query,
|
||||||
const checksumHeaderErr = validateChecksumHeaders(headers);
|
} = request;
|
||||||
if (checksumHeaderErr) {
|
if (!aclUtils.checkGrantHeaderValidity(headers)) {
|
||||||
return callback(checksumHeaderErr);
|
activeSpan.recordException(errors.InvalidArgument);
|
||||||
}
|
objectPutSpan.end();
|
||||||
|
log.trace('invalid acl header');
|
||||||
log.trace('owner canonicalID to send to data', { canonicalID });
|
monitoring.promMetrics('PUT', request.bucketName, 400,
|
||||||
|
'putObject');
|
||||||
return standardMetadataValidateBucketAndObj(valParams, request.actionImplicitDenies, log,
|
return callback(errors.InvalidArgument);
|
||||||
(err, bucket, objMD) => {
|
|
||||||
const responseHeaders = collectCorsHeaders(headers.origin,
|
|
||||||
method, bucket);
|
|
||||||
if (err) {
|
|
||||||
log.trace('error processing request', {
|
|
||||||
error: err,
|
|
||||||
method: 'metadataValidateBucketAndObj',
|
|
||||||
});
|
|
||||||
monitoring.promMetrics('PUT', bucketName, err.code, 'putObject');
|
|
||||||
return callback(err, responseHeaders);
|
|
||||||
}
|
}
|
||||||
if (bucket.hasDeletedFlag() && canonicalID !== bucket.getOwner()) {
|
const queryContainsVersionId = checkQueryVersionId(query);
|
||||||
log.trace('deleted flag on bucket and request ' +
|
if (queryContainsVersionId instanceof Error) {
|
||||||
'from non-owner account');
|
activeSpan.recordException(queryContainsVersionId);
|
||||||
monitoring.promMetrics('PUT', bucketName, 404, 'putObject');
|
objectPutSpan.end();
|
||||||
return callback(errors.NoSuchBucket);
|
return callback(queryContainsVersionId);
|
||||||
|
}
|
||||||
|
const invalidSSEError = errors.InvalidArgument.customizeDescription(
|
||||||
|
'The encryption method specified is not supported');
|
||||||
|
const requestType = request.apiMethods || 'objectPut';
|
||||||
|
const valParams = { authInfo, bucketName, objectKey, requestType, request };
|
||||||
|
const canonicalID = authInfo.getCanonicalID();
|
||||||
|
|
||||||
|
if (hasNonPrintables(objectKey)) {
|
||||||
|
activeSpan.recordException(errors.InvalidInput);
|
||||||
|
objectPutSpan.end();
|
||||||
|
return callback(errors.InvalidInput.customizeDescription(
|
||||||
|
'object keys cannot contain non-printable characters',
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
return async.waterfall([
|
const checksumHeaderErr = validateChecksumHeaders(headers);
|
||||||
function handleTransientOrDeleteBuckets(next) {
|
if (checksumHeaderErr) {
|
||||||
if (bucket.hasTransientFlag() || bucket.hasDeletedFlag()) {
|
activeSpan.recordException(checksumHeaderErr);
|
||||||
return cleanUpBucket(bucket, canonicalID, log, next);
|
objectPutSpan.end();
|
||||||
}
|
return callback(checksumHeaderErr);
|
||||||
return next();
|
}
|
||||||
},
|
|
||||||
function getSSEConfig(next) {
|
log.trace('owner canonicalID to send to data', { canonicalID });
|
||||||
return getObjectSSEConfiguration(headers, bucket, log,
|
const mdSpan = tracer.startSpan('Validating Bucket and Object Metadata', undefined, objectPutSpanContext);
|
||||||
(err, sseConfig) => {
|
const mdSpanContext = opentelemetry.trace.setSpan(
|
||||||
if (err) {
|
objectPutSpanContext,
|
||||||
log.error('error getting server side encryption config', { err });
|
mdSpan,
|
||||||
return next(invalidSSEError);
|
);
|
||||||
}
|
return standardMetadataValidateBucketAndObj(valParams, request.actionImplicitDenies, log,
|
||||||
return next(null, sseConfig);
|
(err, bucket, objMD) => {
|
||||||
}
|
mdSpan.end();
|
||||||
);
|
activeSpan.addEvent('Bucket and Object Metadata Validation Complete');
|
||||||
},
|
activeSpan.addEvent('Collecting CORS Headers');
|
||||||
function createCipherBundle(serverSideEncryptionConfig, next) {
|
const responseHeaders = collectCorsHeaders(headers.origin,
|
||||||
if (serverSideEncryptionConfig) {
|
method, bucket);
|
||||||
return kms.createCipherBundle(
|
activeSpan.addEvent('CORS Headers Collected');
|
||||||
serverSideEncryptionConfig, log, next);
|
|
||||||
}
|
|
||||||
return next(null, null);
|
|
||||||
},
|
|
||||||
function objectCreateAndStore(cipherBundle, next) {
|
|
||||||
const objectLockValidationError
|
|
||||||
= validateHeaders(bucket, headers, log);
|
|
||||||
if (objectLockValidationError) {
|
|
||||||
return next(objectLockValidationError);
|
|
||||||
}
|
|
||||||
writeContinue(request, request._response);
|
|
||||||
return createAndStoreObject(bucketName,
|
|
||||||
bucket, objectKey, objMD, authInfo, canonicalID, cipherBundle,
|
|
||||||
request, false, streamingV4Params, overheadField, log, next);
|
|
||||||
},
|
|
||||||
], (err, storingResult) => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
monitoring.promMetrics('PUT', bucketName, err.code,
|
log.trace('error processing request', {
|
||||||
'putObject');
|
error: err,
|
||||||
|
method: 'metadataValidateBucketAndObj',
|
||||||
|
});
|
||||||
|
monitoring.promMetrics('PUT', bucketName, err.code, 'putObject');
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
objectPutSpan.end();
|
||||||
return callback(err, responseHeaders);
|
return callback(err, responseHeaders);
|
||||||
}
|
}
|
||||||
// ingestSize assumes that these custom headers indicate
|
activeSpan.addEvent('Bucket Policy Validation Passed');
|
||||||
// an ingestion PUT which is a metadata only operation.
|
if (bucket.hasDeletedFlag() && canonicalID !== bucket.getOwner()) {
|
||||||
// Since these headers can be modified client side, they
|
log.trace('deleted flag on bucket and request ' +
|
||||||
// should be used with caution if needed for precise
|
'from non-owner account');
|
||||||
// metrics.
|
monitoring.promMetrics('PUT', bucketName, 404, 'putObject');
|
||||||
const ingestSize = (request.headers['x-amz-meta-mdonly']
|
activeSpan.recordException(errors.NoSuchBucket);
|
||||||
&& !Number.isNaN(request.headers['x-amz-meta-size']))
|
objectPutSpan.end();
|
||||||
? Number.parseInt(request.headers['x-amz-meta-size'], 10) : null;
|
return callback(errors.NoSuchBucket);
|
||||||
const newByteLength = parsedContentLength;
|
}
|
||||||
|
|
||||||
setExpirationHeaders(responseHeaders, {
|
return async.waterfall([
|
||||||
lifecycleConfig: bucket.getLifecycleConfiguration(),
|
function handleTransientOrDeleteBuckets(next) {
|
||||||
objectParams: {
|
if (bucket.hasTransientFlag() || bucket.hasDeletedFlag()) {
|
||||||
key: objectKey,
|
activeSpan.addEvent('Bucket in Transient or Deleted State');
|
||||||
date: storingResult.lastModified,
|
return tracer.startActiveSpan('Checking Server-Side Encryption Configuration', undefined, objectPutSpanContext, currentSpan => {
|
||||||
tags: storingResult.tags,
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'objectPut()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 168,
|
||||||
|
});
|
||||||
|
return cleanUpBucket(bucket, log, err => {
|
||||||
|
activeSpan.addEvent('Bucket Cleanup Complete');
|
||||||
|
currentSpan.end();
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
objectPutSpan.end();
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return next(null);
|
||||||
},
|
},
|
||||||
});
|
next => tracer.startActiveSpan('Bucket check for Server side configuration - SSE', undefined, objectPutSpanContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
// Utapi expects null or a number for oldByteLength:
|
'code.function': 'objectPut()',
|
||||||
// * null - new object
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
// * 0 or > 0 - existing object with content-length 0 or > 0
|
'code.lineno': 178,
|
||||||
// objMD here is the master version that we would
|
});
|
||||||
// have overwritten if there was an existing version or object
|
return next(null, currentSpan);
|
||||||
//
|
}),
|
||||||
// TODO: Handle utapi metrics for null version overwrites.
|
function getSSEConfig(currentSpan, next) {
|
||||||
const oldByteLength = objMD && objMD['content-length']
|
activeSpan.addEvent('Retrieving Server-Side Encryption Configuration');
|
||||||
!== undefined ? objMD['content-length'] : null;
|
return getObjectSSEConfiguration(headers, bucket, log,
|
||||||
if (storingResult) {
|
(err, sseConfig) => {
|
||||||
// ETag's hex should always be enclosed in quotes
|
if (err) {
|
||||||
responseHeaders.ETag = `"${storingResult.contentMD5}"`;
|
log.error('error getting server side encryption config', { err });
|
||||||
}
|
activeSpan.recordException(invalidSSEError);
|
||||||
const vcfg = bucket.getVersioningConfiguration();
|
currentSpan.end();
|
||||||
const isVersionedObj = vcfg && vcfg.Status === 'Enabled';
|
objectPutSpan.end();
|
||||||
if (isVersionedObj) {
|
return next(invalidSSEError);
|
||||||
if (storingResult && storingResult.versionId) {
|
}
|
||||||
responseHeaders['x-amz-version-id'] =
|
return next(null, sseConfig, currentSpan);
|
||||||
versionIdUtils.encode(storingResult.versionId,
|
}
|
||||||
config.versionIdEncodingType);
|
);
|
||||||
|
},
|
||||||
|
(sseConfig, currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Retrieved Server-Side Encryption Configuration');
|
||||||
|
currentSpan.end();
|
||||||
|
return next(null, sseConfig);
|
||||||
|
},
|
||||||
|
(sseConfig, next) => tracer.startActiveSpan('Creating Cipher Bundle from SSE Configuration', undefined, objectPutSpanContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'objectPut()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 205,
|
||||||
|
});
|
||||||
|
return next(null, sseConfig, currentSpan);
|
||||||
|
}),
|
||||||
|
function createCipherBundle(serverSideEncryptionConfig, currentSpan, next) {
|
||||||
|
if (serverSideEncryptionConfig) {
|
||||||
|
activeSpan.addEvent('Creating KMS Cipher Bundle');
|
||||||
|
currentSpan.end();
|
||||||
|
objectPutSpan.end();
|
||||||
|
return kms.createCipherBundle(
|
||||||
|
serverSideEncryptionConfig, log, next);
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('No Server-Side Encryption Configuration Found');
|
||||||
|
return next(null, currentSpan);
|
||||||
|
},
|
||||||
|
(currentSpan, next) => {
|
||||||
|
activeSpan.addEvent('Got Server-Side Encryption Configuration');
|
||||||
|
currentSpan.end();
|
||||||
|
return next();
|
||||||
|
},
|
||||||
|
(next) => tracer.startActiveSpan('Validating and Updating Object Data in RING', undefined, objectPutSpanContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'objectPut()',
|
||||||
|
'code.filename': 'lib/api/objectPut.js',
|
||||||
|
'code.lineno': 229,
|
||||||
|
});
|
||||||
|
return next(null, null, currentSpan);
|
||||||
|
}),
|
||||||
|
function objectCreateAndStore(cipherBundle, currentSpan, next) {
|
||||||
|
const currentSpanContext = opentelemetry.trace.setSpan(
|
||||||
|
objectPutSpanContext,
|
||||||
|
currentSpan,
|
||||||
|
);
|
||||||
|
activeSpan.addEvent('Started Object Creation and Storage');
|
||||||
|
const objectLockValidationError
|
||||||
|
= validateHeaders(bucket, headers, log);
|
||||||
|
if (objectLockValidationError) {
|
||||||
|
activeSpan.recordException(objectLockValidationError);
|
||||||
|
currentSpan.end();
|
||||||
|
objectPutSpan.end();
|
||||||
|
return next(objectLockValidationError);
|
||||||
|
}
|
||||||
|
writeContinue(request, request._response);
|
||||||
|
return createAndStoreObject(bucketName,
|
||||||
|
bucket, objectKey, objMD, authInfo, canonicalID, cipherBundle,
|
||||||
|
request, false, streamingV4Params, overheadField, log, (err, storingResult) => {
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
currentSpan.end();
|
||||||
|
objectPutSpan.end();
|
||||||
|
return next(err);
|
||||||
|
}
|
||||||
|
return next(null, storingResult, currentSpan);
|
||||||
|
}, { activeSpan, activeTracerContext: currentSpanContext, tracer });
|
||||||
|
},
|
||||||
|
], (err, storingResult, currentSpan) => {
|
||||||
|
currentSpan.end();
|
||||||
|
if (err) {
|
||||||
|
monitoring.promMetrics('PUT', bucketName, err.code,
|
||||||
|
'putObject');
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
objectPutSpan.end();
|
||||||
|
return callback(err, responseHeaders);
|
||||||
}
|
}
|
||||||
}
|
activeSpan.addEvent('Completed Object Creation and Storage');
|
||||||
|
// ingestSize assumes that these custom headers indicate
|
||||||
|
// an ingestion PUT which is a metadata only operation.
|
||||||
|
// Since these headers can be modified client side, they
|
||||||
|
// should be used with caution if needed for precise
|
||||||
|
// metrics.
|
||||||
|
const ingestSize = (request.headers['x-amz-meta-mdonly']
|
||||||
|
&& !Number.isNaN(request.headers['x-amz-meta-size']))
|
||||||
|
? Number.parseInt(request.headers['x-amz-meta-size'], 10) : null;
|
||||||
|
const newByteLength = parsedContentLength;
|
||||||
|
|
||||||
// Only pre-existing non-versioned objects get 0 all others use 1
|
setExpirationHeaders(responseHeaders, {
|
||||||
const numberOfObjects = !isVersionedObj && oldByteLength !== null ? 0 : 1;
|
lifecycleConfig: bucket.getLifecycleConfiguration(),
|
||||||
|
objectParams: {
|
||||||
|
key: objectKey,
|
||||||
|
date: storingResult.lastModified,
|
||||||
|
tags: storingResult.tags,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
// only the bucket owner's metrics should be updated, regardless of
|
// Utapi expects null or a number for oldByteLength:
|
||||||
// who the requester is
|
// * null - new object
|
||||||
pushMetric('putObject', log, {
|
// * 0 or > 0 - existing object with content-length 0 or > 0
|
||||||
authInfo,
|
// objMD here is the master version that we would
|
||||||
canonicalID: bucket.getOwner(),
|
// have overwritten if there was an existing version or object
|
||||||
bucket: bucketName,
|
//
|
||||||
keys: [objectKey],
|
// TODO: Handle utapi metrics for null version overwrites.
|
||||||
newByteLength,
|
const oldByteLength = objMD && objMD['content-length']
|
||||||
oldByteLength: isVersionedObj ? null : oldByteLength,
|
!== undefined ? objMD['content-length'] : null;
|
||||||
versionId: isVersionedObj && storingResult ? storingResult.versionId : undefined,
|
if (storingResult) {
|
||||||
location: bucket.getLocationConstraint(),
|
// ETag's hex should always be enclosed in quotes
|
||||||
numberOfObjects,
|
responseHeaders.ETag = `"${storingResult.contentMD5}"`;
|
||||||
|
}
|
||||||
|
const vcfg = bucket.getVersioningConfiguration();
|
||||||
|
const isVersionedObj = vcfg && vcfg.Status === 'Enabled';
|
||||||
|
if (isVersionedObj) {
|
||||||
|
if (storingResult && storingResult.versionId) {
|
||||||
|
responseHeaders['x-amz-version-id'] =
|
||||||
|
versionIdUtils.encode(storingResult.versionId,
|
||||||
|
config.versionIdEncodingType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only pre-existing non-versioned objects get 0 all others use 1
|
||||||
|
const numberOfObjects = !isVersionedObj && oldByteLength !== null ? 0 : 1;
|
||||||
|
|
||||||
|
// only the bucket owner's metrics should be updated, regardless of
|
||||||
|
// who the requester is
|
||||||
|
pushMetric('putObject', log, {
|
||||||
|
authInfo,
|
||||||
|
canonicalID: bucket.getOwner(),
|
||||||
|
bucket: bucketName,
|
||||||
|
keys: [objectKey],
|
||||||
|
newByteLength,
|
||||||
|
oldByteLength: isVersionedObj ? null : oldByteLength,
|
||||||
|
versionId: isVersionedObj && storingResult ? storingResult.versionId : undefined,
|
||||||
|
location: bucket.getLocationConstraint(),
|
||||||
|
numberOfObjects,
|
||||||
|
});
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'aws.s3.object.size': newByteLength,
|
||||||
|
'aws.s3.region': bucket.getLocationConstraint(),
|
||||||
|
'aws.s3.objects.count': numberOfObjects,
|
||||||
|
'aws.s3.upload_id': storingResult ? storingResult.uploadId : undefined,
|
||||||
|
});
|
||||||
|
monitoring.promMetrics('PUT', bucketName, '200',
|
||||||
|
'putObject', newByteLength, oldByteLength, isVersionedObj,
|
||||||
|
null, ingestSize);
|
||||||
|
activeSpan.addEvent('Exiting objectPut() Function');
|
||||||
|
objectPutSpan.end();
|
||||||
|
return callback(null, responseHeaders);
|
||||||
});
|
});
|
||||||
monitoring.promMetrics('PUT', bucketName, '200',
|
}, { activeSpan, activeTracerContext: mdSpanContext, tracer });
|
||||||
'putObject', newByteLength, oldByteLength, isVersionedObj,
|
|
||||||
null, ingestSize);
|
|
||||||
return callback(null, responseHeaders);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ const { config } = require('../Config');
|
||||||
* @param {function} cb - cb to server
|
* @param {function} cb - cb to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPutACL(authInfo, request, log, cb) {
|
function objectPutACL(authInfo, request, log, cb, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPutACL' });
|
log.debug('processing request', { method: 'objectPutACL' });
|
||||||
const { bucketName, objectKey } = request;
|
const { bucketName, objectKey } = request;
|
||||||
const newCannedACL = request.headers['x-amz-acl'];
|
const newCannedACL = request.headers['x-amz-acl'];
|
||||||
|
@ -130,7 +130,7 @@ function objectPutACL(authInfo, request, log, cb) {
|
||||||
return next(errors.MethodNotAllowed, bucket);
|
return next(errors.MethodNotAllowed, bucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function parseAclFromXml(bucket, objectMD, next) {
|
function parseAclFromXml(bucket, objectMD, next) {
|
||||||
// If not setting acl through headers, parse body
|
// If not setting acl through headers, parse body
|
||||||
|
|
|
@ -33,7 +33,7 @@ const skipError = new Error('skip');
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPutCopyPart(authInfo, request, sourceBucket,
|
function objectPutCopyPart(authInfo, request, sourceBucket,
|
||||||
sourceObject, reqVersionId, log, callback) {
|
sourceObject, reqVersionId, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPutCopyPart' });
|
log.debug('processing request', { method: 'objectPutCopyPart' });
|
||||||
const destBucketName = request.bucketName;
|
const destBucketName = request.bucketName;
|
||||||
const destObjectKey = request.objectKey;
|
const destObjectKey = request.objectKey;
|
||||||
|
@ -108,7 +108,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
||||||
return next(errors.NoSuchBucket);
|
return next(errors.NoSuchBucket);
|
||||||
}
|
}
|
||||||
return next(null, destBucketMD);
|
return next(null, destBucketMD);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
function checkSourceAuthorization(destBucketMD, next) {
|
function checkSourceAuthorization(destBucketMD, next) {
|
||||||
return standardMetadataValidateBucketAndObj(valGetParams, request.actionImplicitDenies, log,
|
return standardMetadataValidateBucketAndObj(valGetParams, request.actionImplicitDenies, log,
|
||||||
|
@ -179,7 +179,7 @@ function objectPutCopyPart(authInfo, request, sourceBucket,
|
||||||
return next(null, copyLocator.dataLocator, destBucketMD,
|
return next(null, copyLocator.dataLocator, destBucketMD,
|
||||||
copyLocator.copyObjectSize, sourceVerId,
|
copyLocator.copyObjectSize, sourceVerId,
|
||||||
sourceLocationConstraintName);
|
sourceLocationConstraintName);
|
||||||
});
|
}, oTel);
|
||||||
},
|
},
|
||||||
// get MPU shadow bucket to get splitter based on MD version
|
// get MPU shadow bucket to get splitter based on MD version
|
||||||
function getMpuShadowBucket(dataLocator, destBucketMD,
|
function getMpuShadowBucket(dataLocator, destBucketMD,
|
||||||
|
|
|
@ -22,7 +22,7 @@ const REPLICATION_ACTION = 'PUT_LEGAL_HOLD';
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPutLegalHold(authInfo, request, log, callback) {
|
function objectPutLegalHold(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPutLegalHold' });
|
log.debug('processing request', { method: 'objectPutLegalHold' });
|
||||||
|
|
||||||
const { bucketName, objectKey } = request;
|
const { bucketName, objectKey } = request;
|
||||||
|
@ -77,7 +77,7 @@ function objectPutLegalHold(authInfo, request, log, callback) {
|
||||||
), bucket);
|
), bucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, objectMD, next) => {
|
(bucket, objectMD, next) => {
|
||||||
log.trace('parsing legal hold');
|
log.trace('parsing legal hold');
|
||||||
parseLegalHoldXml(request.post, log, (err, res) =>
|
parseLegalHoldXml(request.post, log, (err, res) =>
|
||||||
|
|
|
@ -51,10 +51,12 @@ function _getPartKey(uploadId, splitter, paddedPartNumber) {
|
||||||
* (to be used for streaming v4 auth if applicable)
|
* (to be used for streaming v4 auth if applicable)
|
||||||
* @param {object} log - Werelogs logger
|
* @param {object} log - Werelogs logger
|
||||||
* @param {function} cb - final callback to call with the result
|
* @param {function} cb - final callback to call with the result
|
||||||
|
* @param {object} authorizationResults authrorization results
|
||||||
|
* @param {object} oTel - OpenTelemetry activeSpan, context and Tracer object
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPutPart(authInfo, request, streamingV4Params, log,
|
function objectPutPart(authInfo, request, streamingV4Params, log,
|
||||||
cb) {
|
cb, authorizationResults, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPutPart' });
|
log.debug('processing request', { method: 'objectPutPart' });
|
||||||
const size = request.parsedContentLength;
|
const size = request.parsedContentLength;
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ const REPLICATION_ACTION = 'PUT_RETENTION';
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPutRetention(authInfo, request, log, callback) {
|
function objectPutRetention(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPutRetention' });
|
log.debug('processing request', { method: 'objectPutRetention' });
|
||||||
|
|
||||||
const { bucketName, objectKey } = request;
|
const { bucketName, objectKey } = request;
|
||||||
|
@ -91,7 +91,7 @@ function objectPutRetention(authInfo, request, log, callback) {
|
||||||
), bucket);
|
), bucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, retentionInfo, objectMD);
|
return next(null, bucket, retentionInfo, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, retentionInfo, objectMD, next) => {
|
(bucket, retentionInfo, objectMD, next) => {
|
||||||
const hasGovernanceBypass = hasGovernanceBypassHeader(request.headers);
|
const hasGovernanceBypass = hasGovernanceBypassHeader(request.headers);
|
||||||
if (hasGovernanceBypass && authInfo.isRequesterAnIAMUser()) {
|
if (hasGovernanceBypass && authInfo.isRequesterAnIAMUser()) {
|
||||||
|
@ -103,7 +103,7 @@ function objectPutRetention(authInfo, request, log, callback) {
|
||||||
return next(err, bucket);
|
return next(err, bucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, retentionInfo, hasGovernanceBypass, objectMD);
|
return next(null, bucket, retentionInfo, hasGovernanceBypass, objectMD);
|
||||||
});
|
}, oTel);
|
||||||
}
|
}
|
||||||
return next(null, bucket, retentionInfo, hasGovernanceBypass, objectMD);
|
return next(null, bucket, retentionInfo, hasGovernanceBypass, objectMD);
|
||||||
},
|
},
|
||||||
|
|
|
@ -23,7 +23,7 @@ const REPLICATION_ACTION = 'PUT_TAGGING';
|
||||||
* @param {function} callback - callback to server
|
* @param {function} callback - callback to server
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function objectPutTagging(authInfo, request, log, callback) {
|
function objectPutTagging(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'objectPutTagging' });
|
log.debug('processing request', { method: 'objectPutTagging' });
|
||||||
|
|
||||||
const { bucketName, objectKey } = request;
|
const { bucketName, objectKey } = request;
|
||||||
|
@ -71,7 +71,7 @@ function objectPutTagging(authInfo, request, log, callback) {
|
||||||
return next(errors.MethodNotAllowed, bucket);
|
return next(errors.MethodNotAllowed, bucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objectMD);
|
return next(null, bucket, objectMD);
|
||||||
}),
|
}, oTel),
|
||||||
(bucket, objectMD, next) => {
|
(bucket, objectMD, next) => {
|
||||||
log.trace('parsing tag(s)');
|
log.trace('parsing tag(s)');
|
||||||
parseTagXml(request.post, log, (err, tags) =>
|
parseTagXml(request.post, log, (err, tags) =>
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
const { errors } = require('arsenal');
|
const { errors } = require('arsenal');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const constants = require('../../constants');
|
const constants = require('../../constants');
|
||||||
const services = require('../services');
|
const services = require('../services');
|
||||||
|
@ -49,45 +50,74 @@ function generateXml(xml, owner, userBuckets, splitter) {
|
||||||
* @param {object} request - normalized request object
|
* @param {object} request - normalized request object
|
||||||
* @param {object} log - Werelogs logger
|
* @param {object} log - Werelogs logger
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
|
* @param {object} oTel - OpenTelemetry methods
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
function serviceGet(authInfo, request, log, callback) {
|
function serviceGet(authInfo, request, log, callback, oTel) {
|
||||||
log.debug('processing request', { method: 'serviceGet' });
|
const {
|
||||||
|
cloudserverApiSpan,
|
||||||
if (authInfo.isRequesterPublicUser()) {
|
activeSpan,
|
||||||
log.debug('operation not available for public user');
|
activeTracerContext,
|
||||||
monitoring.promMetrics(
|
tracer,
|
||||||
'GET', request.bucketName, 403, 'getService');
|
} = oTel;
|
||||||
return callback(errors.AccessDenied);
|
activeSpan.addEvent('Entered serviceGet()');
|
||||||
}
|
const cloudserverApiSpanContext = opentelemetry.trace.setSpan(
|
||||||
const xml = [];
|
activeTracerContext,
|
||||||
const canonicalId = authInfo.getCanonicalID();
|
cloudserverApiSpan,
|
||||||
xml.push(
|
|
||||||
'<?xml version="1.0" encoding="UTF-8"?>',
|
|
||||||
'<ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/' +
|
|
||||||
'2006-03-01/">',
|
|
||||||
'<Owner>',
|
|
||||||
`<ID>${canonicalId}</ID>`,
|
|
||||||
`<DisplayName>${authInfo.getAccountDisplayName()}` +
|
|
||||||
'</DisplayName>',
|
|
||||||
'</Owner>',
|
|
||||||
'<Buckets>'
|
|
||||||
);
|
);
|
||||||
return services.getService(authInfo, request, log, constants.splitter,
|
return tracer.startActiveSpan('ListBuckets API:: Checking bucket policies, and listing buckets', undefined, cloudserverApiSpanContext, serviceGetSpan => {
|
||||||
(err, userBuckets, splitter) => {
|
serviceGetSpan.setAttributes({
|
||||||
if (err) {
|
'code.function': 'serviceGet()',
|
||||||
monitoring.promMetrics(
|
'code.filename': 'lib/api/serviceGet.js',
|
||||||
'GET', userBuckets, err.code, 'getService');
|
'code.lineno': 56,
|
||||||
return callback(err);
|
|
||||||
}
|
|
||||||
// TODO push metric for serviceGet
|
|
||||||
// pushMetric('getService', log, {
|
|
||||||
// bucket: bucketName,
|
|
||||||
// });
|
|
||||||
monitoring.promMetrics('GET', userBuckets, '200', 'getService');
|
|
||||||
return callback(null, generateXml(xml, canonicalId, userBuckets,
|
|
||||||
splitter));
|
|
||||||
});
|
});
|
||||||
|
const ctx = opentelemetry.trace.setSpan(
|
||||||
|
activeTracerContext,
|
||||||
|
serviceGetSpan,
|
||||||
|
);
|
||||||
|
log.debug('processing request', { method: 'serviceGet' });
|
||||||
|
|
||||||
|
if (authInfo.isRequesterPublicUser()) {
|
||||||
|
log.debug('operation not available for public user');
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', request.bucketName, 403, 'getService');
|
||||||
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
|
serviceGetSpan.end();
|
||||||
|
return callback(errors.AccessDenied);
|
||||||
|
}
|
||||||
|
const xml = [];
|
||||||
|
const canonicalId = authInfo.getCanonicalID();
|
||||||
|
xml.push(
|
||||||
|
'<?xml version="1.0" encoding="UTF-8"?>',
|
||||||
|
'<ListAllMyBucketsResult xmlns="http://s3.amazonaws.com/doc/' +
|
||||||
|
'2006-03-01/">',
|
||||||
|
'<Owner>',
|
||||||
|
`<ID>${canonicalId}</ID>`,
|
||||||
|
`<DisplayName>${authInfo.getAccountDisplayName()}` +
|
||||||
|
'</DisplayName>',
|
||||||
|
'</Owner>',
|
||||||
|
'<Buckets>'
|
||||||
|
);
|
||||||
|
return services.getService(authInfo, request, log, constants.splitter,
|
||||||
|
(err, userBuckets, splitter) => {
|
||||||
|
if (err) {
|
||||||
|
monitoring.promMetrics(
|
||||||
|
'GET', userBuckets, err.code, 'getService');
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
serviceGetSpan.end();
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
// TODO push metric for serviceGet
|
||||||
|
// pushMetric('getService', log, {
|
||||||
|
// bucket: bucketName,
|
||||||
|
// });
|
||||||
|
monitoring.promMetrics('GET', userBuckets, '200', 'getService');
|
||||||
|
activeSpan.addEvent('Leaving serviceGet()');
|
||||||
|
serviceGetSpan.end();
|
||||||
|
return callback(null, generateXml(xml, canonicalId, userBuckets,
|
||||||
|
splitter));
|
||||||
|
}, undefined, { activeSpan, activeTracerContext: ctx, tracer });
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = serviceGet;
|
module.exports = serviceGet;
|
||||||
|
|
|
@ -169,6 +169,7 @@ function validateBucket(bucket, params, log, actionImplicitDenies = {}) {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** standardMetadataValidateBucketAndObj - retrieve bucket and object md from metadata
|
/** standardMetadataValidateBucketAndObj - retrieve bucket and object md from metadata
|
||||||
* and check if user is authorized to access them.
|
* and check if user is authorized to access them.
|
||||||
* @param {object} params - function parameters
|
* @param {object} params - function parameters
|
||||||
|
@ -183,59 +184,101 @@ function validateBucket(bucket, params, log, actionImplicitDenies = {}) {
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
* @return {undefined} - and call callback with params err, bucket md
|
* @return {undefined} - and call callback with params err, bucket md
|
||||||
*/
|
*/
|
||||||
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback) {
|
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback, oTel) {
|
||||||
|
const {
|
||||||
|
activeSpan,
|
||||||
|
activeTracerContext,
|
||||||
|
tracer,
|
||||||
|
} = oTel;
|
||||||
|
activeSpan.addEvent('Entered standardMetadataValidateBucketAndObj()');
|
||||||
const { authInfo, bucketName, objectKey, versionId, getDeleteMarker, request } = params;
|
const { authInfo, bucketName, objectKey, versionId, getDeleteMarker, request } = params;
|
||||||
let requestType = params.requestType;
|
let requestType = params.requestType;
|
||||||
if (!Array.isArray(requestType)) {
|
if (!Array.isArray(requestType)) {
|
||||||
requestType = [requestType];
|
requestType = [requestType];
|
||||||
}
|
}
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
next => {
|
next => tracer.startActiveSpan('Fetching Metadata for Bucket and Object', undefined, activeTracerContext, getBucketAndObjectMDSpan => {
|
||||||
// versionId may be 'null', which asks metadata to fetch the null key specifically
|
getBucketAndObjectMDSpan.setAttributes({
|
||||||
|
'code.function': 'standardMetadataValidateBucketAndObj()',
|
||||||
|
'code.filename': 'lib/metadata/metadataUtils.js',
|
||||||
|
'code.lineno': 213,
|
||||||
|
});
|
||||||
const getOptions = { versionId };
|
const getOptions = { versionId };
|
||||||
if (getDeleteMarker) {
|
if (getDeleteMarker) {
|
||||||
getOptions.getDeleteMarker = true;
|
getOptions.getDeleteMarker = true;
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Initiating Metadata Fetch for Bucket and Object');
|
||||||
|
// if some implicit iamAuthzResults, return AccessDenied
|
||||||
|
// before leaking any state information
|
||||||
return metadata.getBucketAndObjectMD(bucketName, objectKey, getOptions, log, (err, getResult) => {
|
return metadata.getBucketAndObjectMD(bucketName, objectKey, getOptions, log, (err, getResult) => {
|
||||||
|
getBucketAndObjectMDSpan.end();
|
||||||
if (err) {
|
if (err) {
|
||||||
// if some implicit iamAuthzResults, return AccessDenied
|
|
||||||
// before leaking any state information
|
|
||||||
if (actionImplicitDenies && Object.values(actionImplicitDenies).some(v => v === true)) {
|
if (actionImplicitDenies && Object.values(actionImplicitDenies).some(v => v === true)) {
|
||||||
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
return next(errors.AccessDenied);
|
return next(errors.AccessDenied);
|
||||||
}
|
}
|
||||||
|
activeSpan.recordException(err);
|
||||||
return next(err);
|
return next(err);
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Completed Metadata Fetch for Bucket and Object');
|
||||||
return next(null, getResult);
|
return next(null, getResult);
|
||||||
});
|
});
|
||||||
},
|
}),
|
||||||
(getResult, next) => {
|
(getResult, next) => {
|
||||||
const bucket = getResult.bucket ?
|
return tracer.startActiveSpan('Validating Bucket and Object Metadata and Bucket Policies', undefined, activeTracerContext, validateBucketAndObjectMDSpan => {
|
||||||
BucketInfo.deSerialize(getResult.bucket) : undefined;
|
validateBucketAndObjectMDSpan.setAttributes({
|
||||||
if (!bucket) {
|
'code.function': 'standardMetadataValidateBucketAndObj()',
|
||||||
log.debug('bucketAttrs is undefined', {
|
'code.filename': 'lib/metadata/metadataUtils.js',
|
||||||
bucket: bucketName,
|
'code.lineno': 237,
|
||||||
method: 'metadataValidateBucketAndObj',
|
|
||||||
});
|
});
|
||||||
return next(errors.NoSuchBucket);
|
const bucket = getResult.bucket ?
|
||||||
}
|
BucketInfo.deSerialize(getResult.bucket) : undefined;
|
||||||
const validationError = validateBucket(bucket, params, log, actionImplicitDenies);
|
if (!bucket) {
|
||||||
if (validationError) {
|
log.debug('bucketAttrs is undefined', {
|
||||||
return next(validationError, bucket);
|
bucket: bucketName,
|
||||||
}
|
method: 'metadataValidateBucketAndObj',
|
||||||
const objMD = getResult.obj ? JSON.parse(getResult.obj) : undefined;
|
});
|
||||||
if (!objMD && versionId === 'null') {
|
activeSpan.recordException(errors.NoSuchBucket);
|
||||||
return getNullVersionFromMaster(bucketName, objectKey, log,
|
validateBucketAndObjectMDSpan.end();
|
||||||
(err, nullVer) => next(err, bucket, nullVer));
|
return next(errors.NoSuchBucket);
|
||||||
}
|
}
|
||||||
return next(null, bucket, objMD);
|
const validationError = validateBucket(bucket, params, log, actionImplicitDenies);
|
||||||
|
if (validationError) {
|
||||||
|
activeSpan.recordException(validationError);
|
||||||
|
validateBucketAndObjectMDSpan.end();
|
||||||
|
return next(validationError, bucket);
|
||||||
|
}
|
||||||
|
const objMD = getResult.obj ? JSON.parse(getResult.obj) : undefined;
|
||||||
|
if (!objMD && versionId === 'null') {
|
||||||
|
activeSpan.addEvent('Fetching Null Version from Master');
|
||||||
|
return getNullVersionFromMaster(bucketName, objectKey, log,
|
||||||
|
(err, nullVer) => {
|
||||||
|
activeSpan.addEvent('Completed Null Version Fetch from Master');
|
||||||
|
validateBucketAndObjectMDSpan.end();
|
||||||
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
validateBucketAndObjectMDSpan.end();
|
||||||
|
return next(err, bucket);
|
||||||
|
}
|
||||||
|
activeSpan.addEvent('Completed Bucket and Object Metadata Validation');
|
||||||
|
return next(null, bucket, nullVer);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
validateBucketAndObjectMDSpan.end();
|
||||||
|
activeSpan.addEvent('Completed Bucket and Object Metadata Validation');
|
||||||
|
return next(null, bucket, objMD);
|
||||||
|
});
|
||||||
},
|
},
|
||||||
(bucket, objMD, next) => {
|
(bucket, objMD, next) => {
|
||||||
|
activeSpan.addEvent('Initiating Bucket Policy Authorization Checks');
|
||||||
const canonicalID = authInfo.getCanonicalID();
|
const canonicalID = authInfo.getCanonicalID();
|
||||||
if (!isObjAuthorized(bucket, objMD, requestType, canonicalID, authInfo, log, request,
|
if (!isObjAuthorized(bucket, objMD, requestType, canonicalID, authInfo, log, request,
|
||||||
actionImplicitDenies)) {
|
actionImplicitDenies)) {
|
||||||
log.debug('access denied for user on object', { requestType });
|
log.debug('access denied for user on object', { requestType });
|
||||||
|
activeSpan.recordException(errors.AccessDenied);
|
||||||
return next(errors.AccessDenied, bucket);
|
return next(errors.AccessDenied, bucket);
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Completed Bucket Policy Authorization Checks');
|
||||||
return next(null, bucket, objMD);
|
return next(null, bucket, objMD);
|
||||||
},
|
},
|
||||||
], (err, bucket, objMD) => {
|
], (err, bucket, objMD) => {
|
||||||
|
@ -243,9 +286,11 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
||||||
// still return bucket for cors headers
|
// still return bucket for cors headers
|
||||||
return callback(err, bucket);
|
return callback(err, bucket);
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Leaving standardMetadataValidateBucketAndObj()');
|
||||||
return callback(null, bucket, objMD);
|
return callback(null, bucket, objMD);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/** standardMetadataValidateBucket - retrieve bucket from metadata and check if user
|
/** standardMetadataValidateBucket - retrieve bucket from metadata and check if user
|
||||||
* is authorized to access it
|
* is authorized to access it
|
||||||
* @param {object} params - function parameters
|
* @param {object} params - function parameters
|
||||||
|
@ -258,7 +303,34 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
||||||
* @param {function} callback - callback
|
* @param {function} callback - callback
|
||||||
* @return {undefined} - and call callback with params err, bucket md
|
* @return {undefined} - and call callback with params err, bucket md
|
||||||
*/
|
*/
|
||||||
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback) {
|
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback, oTel) {
|
||||||
|
if (oTel) {
|
||||||
|
const {
|
||||||
|
activeSpan,
|
||||||
|
activeTracerContext,
|
||||||
|
tracer,
|
||||||
|
} = oTel;
|
||||||
|
activeSpan.addEvent('Entered standardMetadataValidateBucket()');
|
||||||
|
return tracer.startActiveSpan('Fetching Metadata for Bucket', undefined, activeTracerContext, getBucketMDSpan => {
|
||||||
|
const { bucketName } = params;
|
||||||
|
return metadata.getBucket(bucketName, log, (err, bucket) => {
|
||||||
|
getBucketMDSpan.end();
|
||||||
|
if (err) {
|
||||||
|
// if some implicit actionImplicitDenies, return AccessDenied before
|
||||||
|
// leaking any state information
|
||||||
|
if (actionImplicitDenies && Object.values(actionImplicitDenies).some(v => v === true)) {
|
||||||
|
return callback(errors.AccessDenied);
|
||||||
|
}
|
||||||
|
log.debug('metadata getbucket failed', { error: err });
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
const validationError = validateBucket(bucket, params, log, actionImplicitDenies);
|
||||||
|
activeSpan.addEvent('Completed Bucket Metadata Fetch');
|
||||||
|
return callback(validationError, bucket);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
const { bucketName } = params;
|
const { bucketName } = params;
|
||||||
return metadata.getBucket(bucketName, log, (err, bucket) => {
|
return metadata.getBucket(bucketName, log, (err, bucket) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
|
|
|
@ -2,6 +2,7 @@ const url = require('url');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const httpProxy = require('http-proxy');
|
const httpProxy = require('http-proxy');
|
||||||
const querystring = require('querystring');
|
const querystring = require('querystring');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const backbeatProxy = httpProxy.createProxyServer({
|
const backbeatProxy = httpProxy.createProxyServer({
|
||||||
ignorePath: true,
|
ignorePath: true,
|
||||||
|
@ -1213,6 +1214,31 @@ const backbeatRoutes = {
|
||||||
};
|
};
|
||||||
|
|
||||||
function routeBackbeat(clientIP, request, response, log) {
|
function routeBackbeat(clientIP, request, response, log) {
|
||||||
|
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME || 'cloudserver-service');
|
||||||
|
let activeSpan = opentelemetry.trace.getActiveSpan(opentelemetry.context.active());
|
||||||
|
if (!activeSpan) {
|
||||||
|
// generic name, name will be updates later when we extract more context
|
||||||
|
activeSpan = tracer.startSpan('routeRequest');
|
||||||
|
}
|
||||||
|
const activeTracerContext = opentelemetry.trace.setSpan(
|
||||||
|
opentelemetry.context.active(),
|
||||||
|
activeSpan,
|
||||||
|
);
|
||||||
|
activeSpan.addEvent('New request received by NodeJS Cloudserver server');
|
||||||
|
// const traceId = activeSpan.spanContext().traceId;
|
||||||
|
// const spanId = activeSpan.spanContext().spanId;
|
||||||
|
// const exemplarLabels = { traceId, spanId };
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'code.filepath': 'lib/server.js',
|
||||||
|
'code.function': 'routeRequest()',
|
||||||
|
'code.lineno': 98,
|
||||||
|
'rpc.service': 'S3',
|
||||||
|
});
|
||||||
|
const oTel = {
|
||||||
|
activeSpan,
|
||||||
|
activeTracerContext,
|
||||||
|
tracer,
|
||||||
|
};
|
||||||
// Attach the apiMethod method to the request, so it can used by monitoring in the server
|
// Attach the apiMethod method to the request, so it can used by monitoring in the server
|
||||||
// eslint-disable-next-line no-param-reassign
|
// eslint-disable-next-line no-param-reassign
|
||||||
request.apiMethod = 'routeBackbeat';
|
request.apiMethod = 'routeBackbeat';
|
||||||
|
@ -1341,7 +1367,7 @@ function routeBackbeat(clientIP, request, response, log) {
|
||||||
requestType: request.apiMethods || 'ReplicateObject',
|
requestType: request.apiMethods || 'ReplicateObject',
|
||||||
request,
|
request,
|
||||||
};
|
};
|
||||||
return standardMetadataValidateBucketAndObj(mdValParams, request.actionImplicitDenies, log, next);
|
return standardMetadataValidateBucketAndObj(mdValParams, request.actionImplicitDenies, log, next, oTel);
|
||||||
},
|
},
|
||||||
(bucketInfo, objMd, next) => {
|
(bucketInfo, objMd, next) => {
|
||||||
if (useMultipleBackend) {
|
if (useMultipleBackend) {
|
||||||
|
|
|
@ -2,6 +2,7 @@ const http = require('http');
|
||||||
const https = require('https');
|
const https = require('https');
|
||||||
const cluster = require('cluster');
|
const cluster = require('cluster');
|
||||||
const { series } = require('async');
|
const { series } = require('async');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
const arsenal = require('arsenal');
|
const arsenal = require('arsenal');
|
||||||
const { RedisClient, StatsClient } = arsenal.metrics;
|
const { RedisClient, StatsClient } = arsenal.metrics;
|
||||||
const monitoringClient = require('./utilities/monitoringHandler');
|
const monitoringClient = require('./utilities/monitoringHandler');
|
||||||
|
@ -89,6 +90,30 @@ class S3Server {
|
||||||
* @returns {void}
|
* @returns {void}
|
||||||
*/
|
*/
|
||||||
routeRequest(req, res) {
|
routeRequest(req, res) {
|
||||||
|
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME || 'cloudserver-service');
|
||||||
|
let activeSpan = opentelemetry.trace.getActiveSpan(opentelemetry.context.active());
|
||||||
|
if (!activeSpan) {
|
||||||
|
// generic name, name will be updates later when we extract more context
|
||||||
|
activeSpan = tracer.startSpan('routeRequest');
|
||||||
|
}
|
||||||
|
const activeTracerContext = opentelemetry.trace.setSpan(
|
||||||
|
opentelemetry.context.active(),
|
||||||
|
activeSpan,
|
||||||
|
);
|
||||||
|
activeSpan.addEvent('New request received by NodeJS Cloudserver server');
|
||||||
|
// const traceId = activeSpan.spanContext().traceId;
|
||||||
|
// const spanId = activeSpan.spanContext().spanId;
|
||||||
|
// const exemplarLabels = { traceId, spanId };
|
||||||
|
activeSpan.setAttributes({
|
||||||
|
'code.filepath': 'lib/server.js',
|
||||||
|
'code.function': 'routeRequest()',
|
||||||
|
'code.lineno': 98,
|
||||||
|
'rpc.service': 'S3',
|
||||||
|
'rpc.system': 'aws-api',
|
||||||
|
'code.url': 'https://github.com/scality/cloudserver/blob/development/7.70/lib/server.js#L91',
|
||||||
|
'server.address': process.env.HOSTNAME || 'anurag-local',
|
||||||
|
'server.port': 8000,
|
||||||
|
});
|
||||||
metrics.httpActiveRequests.inc();
|
metrics.httpActiveRequests.inc();
|
||||||
const requestStartTime = process.hrtime.bigint();
|
const requestStartTime = process.hrtime.bigint();
|
||||||
|
|
||||||
|
@ -109,6 +134,8 @@ class S3Server {
|
||||||
if (req.apiMethod) {
|
if (req.apiMethod) {
|
||||||
labels.action = req.apiMethod;
|
labels.action = req.apiMethod;
|
||||||
}
|
}
|
||||||
|
activeSpan.addEvent('Response sent to client');
|
||||||
|
activeSpan.end();
|
||||||
metrics.httpRequestsTotal.labels(labels).inc();
|
metrics.httpRequestsTotal.labels(labels).inc();
|
||||||
metrics.httpRequestDurationSeconds
|
metrics.httpRequestDurationSeconds
|
||||||
.labels(labels)
|
.labels(labels)
|
||||||
|
@ -138,6 +165,11 @@ class S3Server {
|
||||||
metadata,
|
metadata,
|
||||||
locStorageCheckFn: locationStorageCheck,
|
locStorageCheckFn: locationStorageCheck,
|
||||||
vault,
|
vault,
|
||||||
|
oTel: {
|
||||||
|
tracer,
|
||||||
|
activeSpan,
|
||||||
|
activeTracerContext,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
routes(req, res, params, logger, _config);
|
routes(req, res, params, logger, _config);
|
||||||
|
|
168
lib/services.js
168
lib/services.js
|
@ -1,4 +1,5 @@
|
||||||
const assert = require('assert');
|
const assert = require('assert');
|
||||||
|
const opentelemetry = require('@opentelemetry/api');
|
||||||
|
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const { errors, s3middleware } = require('arsenal');
|
const { errors, s3middleware } = require('arsenal');
|
||||||
|
@ -29,48 +30,67 @@ function setLastModifiedFromHeader(md, metaHeaders) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const services = {
|
const services = {
|
||||||
getService(authInfo, request, log, splitter, cb, overrideUserbucket) {
|
getService(authInfo, request, log, splitter, cb, overrideUserbucket, oTel) {
|
||||||
const canonicalID = authInfo.getCanonicalID();
|
const {
|
||||||
assert.strictEqual(typeof splitter, 'string');
|
activeSpan,
|
||||||
const prefix = `${canonicalID}${splitter}`;
|
activeTracerContext,
|
||||||
const bucketUsers = overrideUserbucket || usersBucket;
|
tracer,
|
||||||
// Note: we are limiting max keys on a bucket listing to 10000
|
} = oTel;
|
||||||
// AWS does not limit but they only allow 100 buckets
|
return tracer.startActiveSpan(`Metadata request for ${overrideUserbucket ? `${overrideUserbucket}` : `${usersBucket}`}`, undefined, activeTracerContext, serviceGetServiceSpan => {
|
||||||
// (without special increase)
|
serviceGetServiceSpan.setAttributes({
|
||||||
// TODO: Consider implementing pagination like object listing
|
'code.function': 'services.getService()',
|
||||||
// with respect to bucket listing so can go beyond 10000
|
'code.filename': 'lib/services.js',
|
||||||
metadata.listObject(bucketUsers, { prefix, maxKeys: 10000 }, log,
|
'code.lineno': 33,
|
||||||
(err, listResponse) => {
|
|
||||||
// If MD responds with NoSuchBucket, this means the
|
|
||||||
// hidden usersBucket has not yet been created for
|
|
||||||
// the domain. If this is the case, it means
|
|
||||||
// that no buckets in this domain have been created so
|
|
||||||
// it follows that this particular user has no buckets.
|
|
||||||
// So, the get service listing should not have any
|
|
||||||
// buckets to list. By returning an empty array, the
|
|
||||||
// getService API will just respond with the user info
|
|
||||||
// without listing any buckets.
|
|
||||||
if (err && err.NoSuchBucket) {
|
|
||||||
log.trace('no buckets found');
|
|
||||||
// If we checked the old user bucket, that means we
|
|
||||||
// already checked the new user bucket. If neither the
|
|
||||||
// old user bucket or the new user bucket exist, no buckets
|
|
||||||
// have yet been created in the namespace so an empty
|
|
||||||
// listing should be returned
|
|
||||||
if (overrideUserbucket) {
|
|
||||||
return cb(null, [], splitter);
|
|
||||||
}
|
|
||||||
// Since there were no results from checking the
|
|
||||||
// new users bucket, we check the old users bucket
|
|
||||||
return this.getService(authInfo, request, log,
|
|
||||||
constants.oldSplitter, cb, oldUsersBucket);
|
|
||||||
}
|
|
||||||
if (err) {
|
|
||||||
log.error('error from metadata', { error: err });
|
|
||||||
return cb(err);
|
|
||||||
}
|
|
||||||
return cb(null, listResponse.Contents, splitter);
|
|
||||||
});
|
});
|
||||||
|
const canonicalID = authInfo.getCanonicalID();
|
||||||
|
assert.strictEqual(typeof splitter, 'string');
|
||||||
|
const prefix = `${canonicalID}${splitter}`;
|
||||||
|
const bucketUsers = overrideUserbucket || usersBucket;
|
||||||
|
// Note: we are limiting max keys on a bucket listing to 10000
|
||||||
|
// AWS does not limit but they only allow 100 buckets
|
||||||
|
// (without special increase)
|
||||||
|
// TODO: Consider implementing pagination like object listing
|
||||||
|
// with respect to bucket listing so can go beyond 10000
|
||||||
|
metadata.listObject(bucketUsers, { prefix, maxKeys: 10000 }, log,
|
||||||
|
(err, listResponse) => {
|
||||||
|
// If MD responds with NoSuchBucket, this means the
|
||||||
|
// hidden usersBucket has not yet been created for
|
||||||
|
// the domain. If this is the case, it means
|
||||||
|
// that no buckets in this domain have been created so
|
||||||
|
// it follows that this particular user has no buckets.
|
||||||
|
// So, the get service listing should not have any
|
||||||
|
// buckets to list. By returning an empty array, the
|
||||||
|
// getService API will just respond with the user info
|
||||||
|
// without listing any buckets.
|
||||||
|
if (err && err.NoSuchBucket) {
|
||||||
|
serviceGetServiceSpan.end();
|
||||||
|
log.trace('no buckets found');
|
||||||
|
// If we checked the old user bucket, that means we
|
||||||
|
// already checked the new user bucket. If neither the
|
||||||
|
// old user bucket or the new user bucket exist, no buckets
|
||||||
|
// have yet been created in the namespace so an empty
|
||||||
|
// listing should be returned
|
||||||
|
if (overrideUserbucket) {
|
||||||
|
return cb(null, [], splitter);
|
||||||
|
}
|
||||||
|
serviceGetServiceSpan.end();
|
||||||
|
activeSpan.addEvent('Checking metadata for old user buckets');
|
||||||
|
// Since there were no results from checking the
|
||||||
|
// new users bucket, we check the old users bucket
|
||||||
|
return this.getService(authInfo, request, log,
|
||||||
|
constants.oldSplitter, cb, oldUsersBucket);
|
||||||
|
}
|
||||||
|
if (err) {
|
||||||
|
log.error('error from metadata', { error: err });
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
serviceGetServiceSpan.end();
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
serviceGetServiceSpan.end();
|
||||||
|
activeSpan.addEvent('Got metadata, exiting serviceGet()');
|
||||||
|
return cb(null, listResponse.Contents, splitter);
|
||||||
|
});
|
||||||
|
});
|
||||||
},
|
},
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -98,9 +118,21 @@ const services = {
|
||||||
* @param {object} cipherBundle - server side encryption information
|
* @param {object} cipherBundle - server side encryption information
|
||||||
* @param {object} params - custom built object containing resource details.
|
* @param {object} params - custom built object containing resource details.
|
||||||
* @param {function} cb - callback containing result for the next task
|
* @param {function} cb - callback containing result for the next task
|
||||||
|
* @param {function} oTel - opentelemetry methods
|
||||||
* @return {function} executes callback with err or ETag as arguments
|
* @return {function} executes callback with err or ETag as arguments
|
||||||
*/
|
*/
|
||||||
metadataStoreObject(bucketName, dataGetInfo, cipherBundle, params, cb) {
|
metadataStoreObject(bucketName, dataGetInfo, cipherBundle, params, cb, oTel) {
|
||||||
|
const {
|
||||||
|
activeSpan,
|
||||||
|
activeTracerContext,
|
||||||
|
tracer,
|
||||||
|
} = oTel;
|
||||||
|
const objectStorageParamSpan = tracer.startSpan('Extracting Parameters For Metadata Storage', undefined, activeTracerContext);
|
||||||
|
objectStorageParamSpan.setAttributes({
|
||||||
|
'code.function': 'createAndStoreObject()',
|
||||||
|
'code.filename': 'lib/api/apiUtils/object/createAndStoreObject.js',
|
||||||
|
'code.lineno': 87,
|
||||||
|
});
|
||||||
const { objectKey, authInfo, size, contentMD5, metaHeaders,
|
const { objectKey, authInfo, size, contentMD5, metaHeaders,
|
||||||
contentType, cacheControl, contentDisposition, contentEncoding,
|
contentType, cacheControl, contentDisposition, contentEncoding,
|
||||||
expires, multipart, headers, overrideMetadata, log,
|
expires, multipart, headers, overrideMetadata, log,
|
||||||
|
@ -215,7 +247,11 @@ const services = {
|
||||||
error: validationTagRes,
|
error: validationTagRes,
|
||||||
method: 'metadataStoreObject',
|
method: 'metadataStoreObject',
|
||||||
});
|
});
|
||||||
return process.nextTick(() => cb(validationTagRes));
|
return process.nextTick(() => {
|
||||||
|
activeSpan.recordException(validationTagRes);
|
||||||
|
objectStorageParamSpan.end();
|
||||||
|
return cb(validationTagRes);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
md.setTags(validationTagRes);
|
md.setTags(validationTagRes);
|
||||||
}
|
}
|
||||||
|
@ -242,13 +278,22 @@ const services = {
|
||||||
}
|
}
|
||||||
|
|
||||||
log.trace('object metadata', { omVal: md.getValue() });
|
log.trace('object metadata', { omVal: md.getValue() });
|
||||||
|
objectStorageParamSpan.end();
|
||||||
// If this is not the completion of a multipart upload or
|
// If this is not the completion of a multipart upload or
|
||||||
// the creation of a delete marker, parse the headers to
|
// the creation of a delete marker, parse the headers to
|
||||||
// get the ACL's if any
|
// get the ACL's if any
|
||||||
return async.waterfall([
|
return async.waterfall([
|
||||||
callback => {
|
next => tracer.startActiveSpan('Parsed And Processed ACLs If Needed', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'metadataStoreObject()',
|
||||||
|
'code.filename': 'lib/services.js',
|
||||||
|
'code.lineno': 294,
|
||||||
|
});
|
||||||
|
return next(null, currentSpan);
|
||||||
|
}),
|
||||||
|
(currentSpan, callback) => {
|
||||||
if (multipart || md.getIsDeleteMarker()) {
|
if (multipart || md.getIsDeleteMarker()) {
|
||||||
return callback();
|
return callback(null, currentSpan);
|
||||||
}
|
}
|
||||||
const parseAclParams = {
|
const parseAclParams = {
|
||||||
headers,
|
headers,
|
||||||
|
@ -260,17 +305,42 @@ const services = {
|
||||||
acl.parseAclFromHeaders(parseAclParams, (err, parsedACL) => {
|
acl.parseAclFromHeaders(parseAclParams, (err, parsedACL) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
log.debug('error parsing acl', { error: err });
|
log.debug('error parsing acl', { error: err });
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
currentSpan.end();
|
||||||
return callback(err);
|
return callback(err);
|
||||||
}
|
}
|
||||||
md.setAcl(parsedACL);
|
md.setAcl(parsedACL);
|
||||||
return callback();
|
return callback(null, currentSpan);
|
||||||
});
|
});
|
||||||
return null;
|
return null;
|
||||||
},
|
},
|
||||||
callback => metadata.putObjectMD(bucketName, objectKey, md,
|
(currentSpan, callback) => {
|
||||||
options, log, callback),
|
activeSpan.addEvent('Parsed and processed ACLs if needed');
|
||||||
], (err, data) => {
|
currentSpan.end();
|
||||||
|
return callback();
|
||||||
|
},
|
||||||
|
next => tracer.startActiveSpan('Update object metadata - PUT', undefined, activeTracerContext, currentSpan => {
|
||||||
|
currentSpan.setAttributes({
|
||||||
|
'code.function': 'metadataStoreObject()',
|
||||||
|
'code.filename': 'lib/services.js',
|
||||||
|
'code.lineno': 329,
|
||||||
|
});
|
||||||
|
return next(null, currentSpan);
|
||||||
|
}),
|
||||||
|
(currentSpan, callback) => metadata.putObjectMD(bucketName, objectKey, md,
|
||||||
|
options, log, (err, data) => {
|
||||||
|
if (err) {
|
||||||
|
log.error('error from metadata', { error: err });
|
||||||
|
activeSpan.recordException(err);
|
||||||
|
currentSpan.end();
|
||||||
|
return callback(err);
|
||||||
|
}
|
||||||
|
return callback(null, data, currentSpan);
|
||||||
|
}),
|
||||||
|
], (err, data, currentSpan) => {
|
||||||
|
currentSpan.end();
|
||||||
if (err) {
|
if (err) {
|
||||||
|
activeSpan.recordException(err);
|
||||||
log.error('error from metadata', { error: err });
|
log.error('error from metadata', { error: err });
|
||||||
return cb(err);
|
return cb(err);
|
||||||
}
|
}
|
||||||
|
|
24
package.json
24
package.json
|
@ -20,7 +20,17 @@
|
||||||
"homepage": "https://github.com/scality/S3#readme",
|
"homepage": "https://github.com/scality/S3#readme",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@hapi/joi": "^17.1.0",
|
"@hapi/joi": "^17.1.0",
|
||||||
"arsenal": "git+https://github.com/scality/arsenal#7.70.29",
|
"@opentelemetry/api": "^1.9.0",
|
||||||
|
"@opentelemetry/auto-instrumentations-node": "^0.47.1",
|
||||||
|
"@opentelemetry/exporter-metrics-otlp-proto": "^0.52.0",
|
||||||
|
"@opentelemetry/exporter-trace-otlp-proto": "^0.52.0",
|
||||||
|
"@opentelemetry/instrumentation-ioredis": "^0.41.0",
|
||||||
|
"@opentelemetry/resources": "^1.25.0",
|
||||||
|
"@opentelemetry/sdk-metrics": "^1.25.0",
|
||||||
|
"@opentelemetry/sdk-trace-base": "^1.25.0",
|
||||||
|
"@opentelemetry/sdk-trace-web": "^1.25.0",
|
||||||
|
"@opentelemetry/semantic-conventions": "^1.25.0",
|
||||||
|
"arsenal": "git+https://github.com/scality/Arsenal#d90bc4f741446012885640e93d45052fad4f7ea6",
|
||||||
"async": "~2.5.0",
|
"async": "~2.5.0",
|
||||||
"aws-sdk": "2.905.0",
|
"aws-sdk": "2.905.0",
|
||||||
"azure-storage": "^2.1.0",
|
"azure-storage": "^2.1.0",
|
||||||
|
@ -82,13 +92,13 @@
|
||||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||||
"mem_backend": "S3BACKEND=mem node index.js",
|
"mem_backend": "S3BACKEND=mem node index.js",
|
||||||
"start": "npm-run-all --parallel start_dmd start_s3server",
|
"start": "npm-run-all --parallel start_dmd start_s3server",
|
||||||
"start_mdserver": "node mdserver.js",
|
"start_mdserver": "OTEL_SERVICE_NAME=cisco-s3-mdserver node --require ./instrumentation.js mdserver.js",
|
||||||
"start_dataserver": "node dataserver.js",
|
"start_dataserver": "OTEL_SERVICE_NAME=cisco-s3-dataserver node --require ./instrumentation.js dataserver.js",
|
||||||
"start_s3server": "node index.js",
|
"start_s3server": "OTEL_SERVICE_NAME=cisco-s3-cloudserver node --require ./instrumentation.js index.js",
|
||||||
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
||||||
"start_utapi": "node lib/utapi/utapi.js",
|
"start_utapi": "OTEL_SERVICE_NAME=cisco-s3-utapi node --require ./instrumentation.js lib/utapi/utapi.js",
|
||||||
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
"utapi_replay": "OTEL_SERVICE_NAME=cisco-s3-utapi-replay node --require ./instrumentation.js lib/utapi/utapiReplay.js",
|
||||||
"utapi_reindex": "node lib/utapi/utapiReindex.js",
|
"utapi_reindex": "OTEL_SERVICE_NAME=cisco-s3-utapi-reindex node --require ./instrumentation.js lib/utapi/utapiReindex.js",
|
||||||
"test": "CI=true S3BACKEND=mem mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json --recursive tests/unit",
|
"test": "CI=true S3BACKEND=mem mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json --recursive tests/unit",
|
||||||
"test_versionid_base62": "VERSION_ID_ENCODING_TYPE=base62 CI=true S3BACKEND=mem mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json --recursive tests/unit/api",
|
"test_versionid_base62": "VERSION_ID_ENCODING_TYPE=base62 CI=true S3BACKEND=mem mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json --recursive tests/unit/api",
|
||||||
"test_legacy_location": "CI=true S3_LOCATION_FILE=tests/locationConfig/locationConfigLegacy.json S3BACKEND=mem mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json --recursive tests/unit",
|
"test_legacy_location": "CI=true S3_LOCATION_FILE=tests/locationConfig/locationConfigLegacy.json S3BACKEND=mem mocha --reporter mocha-multi-reporters --reporter-options configFile=$INIT_CWD/tests/reporter-config.json --recursive tests/unit",
|
||||||
|
|
Loading…
Reference in New Issue