Compare commits
40 Commits
developmen
...
feature/S3
Author | SHA1 | Date |
---|---|---|
Anurag Mittal | 1abae5844a | |
Anurag Mittal | 9c7298c915 | |
Anurag Mittal | 50669bd150 | |
Anurag Mittal | ee91142dba | |
Anurag Mittal | 08a8df9539 | |
Anurag Mittal | 028133b661 | |
Anurag Mittal | cfd8aebe6c | |
Anurag Mittal | 1d7baa4ce4 | |
Anurag Mittal | da142fc3dd | |
Anurag Mittal | afb0bc5cf7 | |
Anurag Mittal | a85b994247 | |
Anurag Mittal | 34eb2be606 | |
Anurag Mittal | ad68e7a389 | |
Anurag Mittal | 77c4c460dc | |
Anurag Mittal | 4bfc81c096 | |
Anurag Mittal | 6855774e98 | |
Anurag Mittal | 27509916be | |
Anurag Mittal | 07daf73fef | |
Anurag Mittal | af401a0d21 | |
Anurag Mittal | f659176bbe | |
Anurag Mittal | 0e00893017 | |
Anurag Mittal | 1e66cf6ca1 | |
Anurag Mittal | 46fffd43be | |
Anurag Mittal | 5920f3893d | |
Anurag Mittal | d5877e8274 | |
Anurag Mittal | 82a80fe41f | |
Anurag Mittal | aba023878b | |
Anurag Mittal | 11be9ad1cd | |
Anurag Mittal | 4f3da868bf | |
Anurag Mittal | bc08242d7b | |
Anurag Mittal | 0a8fcc6a6b | |
Anurag Mittal | 6eeabb567c | |
Anurag Mittal | b5ff082efc | |
Anurag Mittal | a206324b76 | |
Anurag Mittal | 55d8ab1053 | |
Anurag Mittal | 5e4d8640c6 | |
Anurag Mittal | 83d3016830 | |
Anurag Mittal | ed743a743a | |
Anurag Mittal | 81d349a6bd | |
Anurag Mittal | c9e24bedf5 |
|
@ -1,6 +1,7 @@
|
|||
import assert from 'assert';
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
import errors from '../errors';
|
||||
import routeGET from './routes/routeGET';
|
||||
|
@ -13,8 +14,10 @@ import * as routesUtils from './routesUtils';
|
|||
import routeWebsite from './routes/routeWebsite';
|
||||
import * as http from 'http';
|
||||
import StatsClient from '../metrics/StatsClient';
|
||||
import { actionMonitoringMapS3 } from '../policyEvaluator/utils/actionMaps';
|
||||
|
||||
import * as requestUtils from '../../lib/policyEvaluator/requestUtils';
|
||||
import { policies } from '../..';
|
||||
|
||||
const routeMap = {
|
||||
GET: routeGET,
|
||||
|
@ -172,6 +175,8 @@ export type Params = {
|
|||
* data retrieval function
|
||||
* @param logger - werelogs logger instance
|
||||
* @param [s3config] - s3 configuration
|
||||
* @param [parentSpanFromCloudserver] - server span from cloudserver with context
|
||||
* @param [tracer] - opentelemetry tracer
|
||||
*/
|
||||
export default function routes(
|
||||
req: http.IncomingMessage,
|
||||
|
@ -179,112 +184,149 @@ export default function routes(
|
|||
params: Params,
|
||||
logger: RequestLogger,
|
||||
s3config?: any,
|
||||
parentSpanFromCloudserver?: any,
|
||||
tracer?: any,
|
||||
) {
|
||||
checkTypes(req, res, params, logger);
|
||||
parentSpanFromCloudserver.addEvent('Arsenal::routes() Validating and processing request');
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
parentSpanFromCloudserver,
|
||||
);
|
||||
const spanOptions = { links: [{ context: parentSpanFromCloudserver.spanContext() }] };
|
||||
return tracer.startActiveSpan('Using Arsenal to validate request', spanOptions, ctx, requestValidatorSpan => {
|
||||
requestValidatorSpan.setAttribute('code.function', 'routes');
|
||||
requestValidatorSpan.setAttribute('code.filepath', 'arsenal/lib/s3routes/routes.ts');
|
||||
requestValidatorSpan.setAttribute('code.lineno', 192);
|
||||
checkTypes(req, res, params, logger);
|
||||
const {
|
||||
api,
|
||||
internalHandlers,
|
||||
statsClient,
|
||||
allEndpoints,
|
||||
websiteEndpoints,
|
||||
blacklistedPrefixes,
|
||||
dataRetrievalParams,
|
||||
} = params;
|
||||
|
||||
const {
|
||||
api,
|
||||
internalHandlers,
|
||||
statsClient,
|
||||
allEndpoints,
|
||||
websiteEndpoints,
|
||||
blacklistedPrefixes,
|
||||
dataRetrievalParams,
|
||||
} = params;
|
||||
const clientInfo = {
|
||||
clientIP: requestUtils.getClientIp(req, s3config),
|
||||
clientPort: req.socket.remotePort,
|
||||
httpMethod: req.method,
|
||||
httpURL: req.url,
|
||||
// @ts-ignore
|
||||
endpoint: req.endpoint,
|
||||
};
|
||||
|
||||
const clientInfo = {
|
||||
clientIP: requestUtils.getClientIp(req, s3config),
|
||||
clientPort: req.socket.remotePort,
|
||||
httpMethod: req.method,
|
||||
httpURL: req.url,
|
||||
// @ts-ignore
|
||||
endpoint: req.endpoint,
|
||||
};
|
||||
|
||||
let reqUids = req.headers['x-scal-request-uids'];
|
||||
if (reqUids !== undefined && !isValidReqUids(reqUids)) {
|
||||
// simply ignore invalid id (any user can provide an
|
||||
// invalid request ID through a crafted header)
|
||||
reqUids = undefined;
|
||||
}
|
||||
const log = (reqUids !== undefined ?
|
||||
// @ts-ignore
|
||||
logger.newRequestLoggerFromSerializedUids(reqUids) :
|
||||
// @ts-ignore
|
||||
logger.newRequestLogger());
|
||||
|
||||
if (!req.url!.startsWith('/_/healthcheck')) {
|
||||
log.info('received request', clientInfo);
|
||||
}
|
||||
|
||||
log.end().addDefaultFields(clientInfo);
|
||||
|
||||
if (req.url!.startsWith('/_/')) {
|
||||
let internalServiceName = req.url!.slice(3);
|
||||
const serviceDelim = internalServiceName.indexOf('/');
|
||||
if (serviceDelim !== -1) {
|
||||
internalServiceName = internalServiceName.slice(0, serviceDelim);
|
||||
let reqUids = req.headers['x-scal-request-uids'];
|
||||
if (reqUids !== undefined && !isValidReqUids(reqUids)) {
|
||||
// simply ignore invalid id (any user can provide an
|
||||
// invalid request ID through a crafted header)
|
||||
reqUids = undefined;
|
||||
}
|
||||
if (internalHandlers[internalServiceName] === undefined) {
|
||||
const log = (reqUids !== undefined ?
|
||||
// @ts-ignore
|
||||
logger.newRequestLoggerFromSerializedUids(reqUids) :
|
||||
// @ts-ignore
|
||||
logger.newRequestLogger());
|
||||
|
||||
if (!req.url!.startsWith('/_/healthcheck')) {
|
||||
log.info('received request', clientInfo);
|
||||
}
|
||||
|
||||
log.end().addDefaultFields(clientInfo);
|
||||
|
||||
if (req.url!.startsWith('/_/')) {
|
||||
let internalServiceName = req.url!.slice(3);
|
||||
const serviceDelim = internalServiceName.indexOf('/');
|
||||
if (serviceDelim !== -1) {
|
||||
internalServiceName = internalServiceName.slice(0, serviceDelim);
|
||||
}
|
||||
if (internalHandlers[internalServiceName] === undefined) {
|
||||
return routesUtils.responseXMLBody(
|
||||
errors.InvalidURI, null, res, log);
|
||||
}
|
||||
return internalHandlers[internalServiceName](
|
||||
clientInfo.clientIP, req, res, log, statsClient);
|
||||
}
|
||||
|
||||
if (statsClient) {
|
||||
// report new request for stats
|
||||
statsClient.reportNewRequest('s3');
|
||||
}
|
||||
|
||||
try {
|
||||
const validHosts = allEndpoints.concat(websiteEndpoints);
|
||||
routesUtils.normalizeRequest(req, validHosts);
|
||||
} catch (err: any) {
|
||||
log.debug('could not normalize request', { error: err.stack });
|
||||
return routesUtils.responseXMLBody(
|
||||
errors.InvalidURI, null, res, log);
|
||||
errors.InvalidURI.customizeDescription('Could not parse the ' +
|
||||
'specified URI. Check your restEndpoints configuration.'),
|
||||
null, res, log);
|
||||
}
|
||||
return internalHandlers[internalServiceName](
|
||||
clientInfo.clientIP, req, res, log, statsClient);
|
||||
}
|
||||
|
||||
if (statsClient) {
|
||||
// report new request for stats
|
||||
statsClient.reportNewRequest('s3');
|
||||
}
|
||||
log.addDefaultFields({
|
||||
// @ts-ignore
|
||||
bucketName: req.bucketName,
|
||||
// @ts-ignore
|
||||
objectKey: req.objectKey,
|
||||
// @ts-ignore
|
||||
bytesReceived: req.parsedContentLength || 0,
|
||||
// @ts-ignore
|
||||
bodyLength: parseInt(req.headers['content-length'], 10) || 0,
|
||||
});
|
||||
// @ts-ignore
|
||||
parentSpanFromCloudserver.setAttribute('aws.s3.bucket', req.bucketName);
|
||||
// @ts-ignore
|
||||
parentSpanFromCloudserver.setAttribute('aws.s3.key', req.objectKey);
|
||||
// parentSpanFromCloudserver.setAttribute('aws.s3.request_id', reqUids);
|
||||
// @ts-ignore
|
||||
requestValidatorSpan.setAttribute('aws.s3.bucket', req.bucketName);
|
||||
// @ts-ignore
|
||||
requestValidatorSpan.setAttribute("aws.s3.key", req.objectKey);
|
||||
// requestValidatorSpan.setAttribute('aws.s3.request_id', reqUids);
|
||||
// TODO: Use this due to high cardinality
|
||||
// parentSpanFromCloudserver.updateName(`${req.method}`);
|
||||
// @ts-ignore
|
||||
// if(req.objectKey){
|
||||
// // @ts-ignore
|
||||
// parentSpanFromCloudserver.updateName(`${req.method} ${req.bucketName}/${req.objectKey}`);
|
||||
// // @ts-ignore
|
||||
// } else if (req.bucketName){
|
||||
// // @ts-ignore
|
||||
// parentSpanFromCloudserver.updateName(`${req.method} ${req.bucketName}`);
|
||||
// } else {
|
||||
// // @ts-ignore
|
||||
// parentSpanFromCloudserver.updateName(`${req.method}`);
|
||||
// }
|
||||
// span.setAttribute('aws.s3.upload_id', req.query.uploadId);
|
||||
|
||||
try {
|
||||
const validHosts = allEndpoints.concat(websiteEndpoints);
|
||||
routesUtils.normalizeRequest(req, validHosts);
|
||||
} catch (err: any) {
|
||||
log.debug('could not normalize request', { error: err.stack });
|
||||
return routesUtils.responseXMLBody(
|
||||
errors.InvalidURI.customizeDescription('Could not parse the ' +
|
||||
'specified URI. Check your restEndpoints configuration.'),
|
||||
null, res, log);
|
||||
}
|
||||
// @ts-ignore
|
||||
const { error, method } = checkUnsupportedRoutes(req.method, req.query);
|
||||
|
||||
if (error) {
|
||||
log.trace('error validating route or uri params', { error });
|
||||
// @ts-ignore
|
||||
return routesUtils.responseXMLBody(error, '', res, log);
|
||||
}
|
||||
|
||||
log.addDefaultFields({
|
||||
// @ts-ignore
|
||||
bucketName: req.bucketName,
|
||||
const bucketOrKeyError = checkBucketAndKey(req.bucketName, req.objectKey,
|
||||
// @ts-ignore
|
||||
req.method, req.query, blacklistedPrefixes, log);
|
||||
|
||||
if (bucketOrKeyError) {
|
||||
log.trace('error with bucket or key value',
|
||||
{ error: bucketOrKeyError });
|
||||
return routesUtils.responseXMLBody(bucketOrKeyError, null, res, log);
|
||||
}
|
||||
requestValidatorSpan.addEvent('Arsenal::routes() Request Validated Successfully');
|
||||
requestValidatorSpan.end();
|
||||
// bucket website request
|
||||
// @ts-ignore
|
||||
objectKey: req.objectKey,
|
||||
// @ts-ignore
|
||||
bytesReceived: req.parsedContentLength || 0,
|
||||
// @ts-ignore
|
||||
bodyLength: parseInt(req.headers['content-length'], 10) || 0,
|
||||
if (websiteEndpoints && websiteEndpoints.indexOf(req.parsedHost) > -1) {
|
||||
return routeWebsite(req, res, api, log, statsClient, dataRetrievalParams);
|
||||
}
|
||||
return method(req, res, api, log, statsClient, dataRetrievalParams, tracer, parentSpanFromCloudserver);
|
||||
});
|
||||
|
||||
// @ts-ignore
|
||||
const { error, method } = checkUnsupportedRoutes(req.method, req.query);
|
||||
|
||||
if (error) {
|
||||
log.trace('error validating route or uri params', { error });
|
||||
// @ts-ignore
|
||||
return routesUtils.responseXMLBody(error, '', res, log);
|
||||
}
|
||||
|
||||
// @ts-ignore
|
||||
const bucketOrKeyError = checkBucketAndKey(req.bucketName, req.objectKey,
|
||||
// @ts-ignore
|
||||
req.method, req.query, blacklistedPrefixes, log);
|
||||
|
||||
if (bucketOrKeyError) {
|
||||
log.trace('error with bucket or key value',
|
||||
{ error: bucketOrKeyError });
|
||||
return routesUtils.responseXMLBody(bucketOrKeyError, null, res, log);
|
||||
}
|
||||
|
||||
// bucket website request
|
||||
// @ts-ignore
|
||||
if (websiteEndpoints && websiteEndpoints.indexOf(req.parsedHost) > -1) {
|
||||
return routeWebsite(req, res, api, log, statsClient, dataRetrievalParams);
|
||||
}
|
||||
|
||||
return method(req, res, api, log, statsClient, dataRetrievalParams);
|
||||
}
|
||||
|
|
|
@ -11,6 +11,9 @@ export default function routeDELETE(
|
|||
api: { callApiMethod: routesUtils.CallApiMethod },
|
||||
log: RequestLogger,
|
||||
statsClient?: StatsClient,
|
||||
dataRetrievalParams?: any,
|
||||
tracer?: any,
|
||||
parentSpanFromCloudserver?: any,
|
||||
) {
|
||||
const call = (name: string) => {
|
||||
return api.callApiMethod(name, request, response, log, (err, corsHeaders) => {
|
||||
|
@ -20,7 +23,7 @@ export default function routeDELETE(
|
|||
}
|
||||
log.debug('routing request', { method: 'routeDELETE' });
|
||||
|
||||
const { query, objectKey } = request as any
|
||||
const { query, objectKey, bucketName } = request as any
|
||||
if (query?.uploadId) {
|
||||
if (objectKey === undefined) {
|
||||
const message = 'A key must be specified';
|
||||
|
@ -49,8 +52,14 @@ export default function routeDELETE(
|
|||
if (query?.tagging !== undefined) {
|
||||
return call('objectDeleteTagging');
|
||||
}
|
||||
parentSpanFromCloudserver.addEvent('Detected Object Delete API request');
|
||||
parentSpanFromCloudserver.updateName(`DeleteObject API with bucket: ${bucketName}`);
|
||||
parentSpanFromCloudserver.setAttribute('aws.request_id', log.getUids()[0]);
|
||||
parentSpanFromCloudserver.setAttribute('rpc.method', 'PutObject');
|
||||
api.callApiMethod('objectDelete', request, response, log,
|
||||
(err, corsHeaders) => {
|
||||
parentSpanFromCloudserver.addEvent('DeleteObject API request completed');
|
||||
parentSpanFromCloudserver.end();
|
||||
/*
|
||||
* Since AWS expects a 204 regardless of the existence of
|
||||
the object, the errors NoSuchKey and NoSuchVersion should not
|
||||
|
@ -63,6 +72,6 @@ export default function routeDELETE(
|
|||
routesUtils.statsReport500(err, statsClient);
|
||||
return routesUtils.responseNoBody(null, corsHeaders, response,
|
||||
204, log);
|
||||
});
|
||||
}, tracer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ import * as routesUtils from '../routesUtils';
|
|||
import errors from '../../errors';
|
||||
import * as http from 'http';
|
||||
import StatsClient from '../../metrics/StatsClient';
|
||||
import { actionMonitoringMapS3 } from '../../policyEvaluator/utils/actionMaps';
|
||||
|
||||
export default function routerGET(
|
||||
request: http.IncomingMessage,
|
||||
|
@ -12,16 +13,23 @@ export default function routerGET(
|
|||
log: RequestLogger,
|
||||
statsClient?: StatsClient,
|
||||
dataRetrievalParams?: any,
|
||||
tracer?: any,
|
||||
parentSpanFromCloudserver?: any,
|
||||
) {
|
||||
log.debug('routing request', { method: 'routerGET' });
|
||||
|
||||
const { bucketName, objectKey, query } = request as any
|
||||
|
||||
const call = (name: string) => {
|
||||
const action = actionMonitoringMapS3[name];
|
||||
// @ts-ignore
|
||||
parentSpanFromCloudserver.updateName(`${action} API${request.bucketName ? ` with bucket: ${request.bucketName}` : ''}`);
|
||||
parentSpanFromCloudserver.setAttribute('aws.request_id', log.getUids()[0]);
|
||||
parentSpanFromCloudserver.setAttribute('rpc.method', action);
|
||||
api.callApiMethod(name, request, response, log, (err, xml, corsHeaders) => {
|
||||
routesUtils.statsReport500(err, statsClient);
|
||||
return routesUtils.responseXMLBody(err, xml, response, log, corsHeaders);
|
||||
});
|
||||
}, tracer);
|
||||
}
|
||||
|
||||
if (bucketName === undefined && objectKey !== undefined) {
|
||||
|
@ -77,20 +85,24 @@ export default function routerGET(
|
|||
call('objectGetRetention');
|
||||
} else {
|
||||
// GET object
|
||||
parentSpanFromCloudserver.updateName(`GetObject API with bucket: ${bucketName}`);
|
||||
parentSpanFromCloudserver.setAttribute('aws.request_id', log.getUids()[0]);
|
||||
parentSpanFromCloudserver.setAttribute('rpc.method', 'GetObject');
|
||||
api.callApiMethod('objectGet', request, response, log,
|
||||
(err, dataGetInfo, resMetaHeaders, range) => {
|
||||
(err, dataGetInfo, resMetaHeaders, range, apiSpan, callAPIMethodSpan) => {
|
||||
let contentLength = 0;
|
||||
if (resMetaHeaders && resMetaHeaders['Content-Length']) {
|
||||
contentLength = resMetaHeaders['Content-Length'];
|
||||
}
|
||||
// TODO ARSN-216 Fix logger
|
||||
apiSpan.addEvent('Located Data')
|
||||
// @ts-ignore
|
||||
log.end().addDefaultFields({ contentLength });
|
||||
routesUtils.statsReport500(err, statsClient);
|
||||
return routesUtils.responseStreamData(err, query,
|
||||
resMetaHeaders, dataGetInfo, dataRetrievalParams, response,
|
||||
range, log);
|
||||
});
|
||||
range, log, apiSpan, callAPIMethodSpan, tracer);
|
||||
}, tracer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,9 @@ export default function routeHEAD(
|
|||
api: { callApiMethod: routesUtils.CallApiMethod },
|
||||
log: RequestLogger,
|
||||
statsClient?: StatsClient,
|
||||
dataRetrievalParams?: any,
|
||||
tracer?: any,
|
||||
parentSpanFromCloudserver?: any,
|
||||
) {
|
||||
log.debug('routing request', { method: 'routeHEAD' });
|
||||
const { bucketName, objectKey } = request as any
|
||||
|
@ -20,19 +23,25 @@ export default function routeHEAD(
|
|||
null, response, log);
|
||||
} else if (objectKey === undefined) {
|
||||
// HEAD bucket
|
||||
parentSpanFromCloudserver.updateName(`HeadBucket API with bucket: ${bucketName}`);
|
||||
parentSpanFromCloudserver.setAttribute('aws.request_id', log.getUids()[0]);
|
||||
parentSpanFromCloudserver.setAttribute('rpc.method', 'HeadBucket');
|
||||
api.callApiMethod('bucketHead', request, response, log,
|
||||
(err, corsHeaders) => {
|
||||
routesUtils.statsReport500(err, statsClient);
|
||||
return routesUtils.responseNoBody(err, corsHeaders, response,
|
||||
200, log);
|
||||
});
|
||||
}, tracer);
|
||||
} else {
|
||||
// HEAD object
|
||||
parentSpanFromCloudserver.updateName(`HeadObject API with bucket: ${bucketName}`);
|
||||
parentSpanFromCloudserver.setAttribute('aws.request_id', log.getUids()[0]);
|
||||
parentSpanFromCloudserver.setAttribute('rpc.method', 'HeadObject');
|
||||
api.callApiMethod('objectHead', request, response, log,
|
||||
(err, resHeaders) => {
|
||||
routesUtils.statsReport500(err, statsClient);
|
||||
return routesUtils.responseContentHeaders(err, {}, resHeaders,
|
||||
response, log);
|
||||
});
|
||||
}, tracer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ export default function routeOPTIONS(
|
|||
api: { callApiMethod: routesUtils.CallApiMethod },
|
||||
log: RequestLogger,
|
||||
statsClient?: StatsClient,
|
||||
tracer?: any,
|
||||
) {
|
||||
log.debug('routing request', { method: 'routeOPTION' });
|
||||
|
||||
|
|
|
@ -10,6 +10,7 @@ export default function routePOST(
|
|||
response: http.ServerResponse,
|
||||
api: { callApiMethod: routesUtils.CallApiMethod },
|
||||
log: RequestLogger,
|
||||
tracer?: any,
|
||||
) {
|
||||
log.debug('routing request', { method: 'routePOST' });
|
||||
|
||||
|
|
|
@ -11,8 +11,12 @@ export default function routePUT(
|
|||
api: { callApiMethod: routesUtils.CallApiMethod },
|
||||
log: RequestLogger,
|
||||
statsClient?: StatsClient,
|
||||
dataRetrievalParams?: any,
|
||||
tracer?: any,
|
||||
parentSpanFromCloudserver?: any,
|
||||
) {
|
||||
log.debug('routing request', { method: 'routePUT' });
|
||||
parentSpanFromCloudserver.addEvent('checking which API to call');
|
||||
|
||||
const { objectKey, query, bucketName, parsedContentLength } = request as any
|
||||
|
||||
|
@ -221,12 +225,18 @@ export default function routePUT(
|
|||
// TODO ARSN-216 What's happening?
|
||||
// @ts-ignore
|
||||
log.end().addDefaultFields({ contentLength: request.parsedContentLength });
|
||||
parentSpanFromCloudserver.addEvent('Detected Object Put API request');
|
||||
parentSpanFromCloudserver.updateName(`PutObject API with bucket: ${bucketName}`);
|
||||
parentSpanFromCloudserver.setAttribute('aws.request_id', log.getUids()[0]);
|
||||
parentSpanFromCloudserver.setAttribute('rpc.method', 'PutObject');
|
||||
api.callApiMethod('objectPut', request, response, log,
|
||||
(err, resHeaders) => {
|
||||
routesUtils.statsReport500(err, statsClient);
|
||||
parentSpanFromCloudserver.addEvent('PutObject API request completed');
|
||||
parentSpanFromCloudserver.end();
|
||||
return routesUtils.responseNoBody(err, resHeaders,
|
||||
response, 200, log);
|
||||
});
|
||||
}, tracer);
|
||||
}
|
||||
}
|
||||
return undefined;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import * as url from 'url';
|
||||
import * as http from 'http';
|
||||
import { eachSeries } from 'async';
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
import { RequestLogger } from 'werelogs';
|
||||
|
||||
|
@ -16,6 +17,7 @@ export type CallApiMethod = (
|
|||
response: http.ServerResponse,
|
||||
log: RequestLogger,
|
||||
callback: (err: ArsenalError | null, ...data: any[]) => void,
|
||||
tracer?: any,
|
||||
) => void;
|
||||
|
||||
/**
|
||||
|
@ -350,6 +352,9 @@ function retrieveData(
|
|||
retrieveDataParams: any,
|
||||
response: http.ServerResponse,
|
||||
log: RequestLogger,
|
||||
apiSpan?: any,
|
||||
callApiMethodSpan?: any,
|
||||
tracer?: any,
|
||||
) {
|
||||
if (locations.length === 0) {
|
||||
return response.end();
|
||||
|
@ -368,6 +373,11 @@ function retrieveData(
|
|||
// the S3-client might close the connection while we are processing it
|
||||
response.once('close', () => {
|
||||
responseDestroyed = true;
|
||||
if (apiSpan) {
|
||||
apiSpan.addEvent('response closed by client request');
|
||||
apiSpan.end();
|
||||
callApiMethodSpan.end();
|
||||
}
|
||||
if (currentStream) {
|
||||
currentStream.destroy();
|
||||
}
|
||||
|
@ -382,60 +392,84 @@ function retrieveData(
|
|||
locStorageCheckFn,
|
||||
vault,
|
||||
} = retrieveDataParams;
|
||||
// need special context for HTTPs requests going externally from cloudserver
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
apiSpan,
|
||||
);
|
||||
const data = new DataWrapper(
|
||||
client, implName, config, kms, metadata, locStorageCheckFn, vault);
|
||||
return eachSeries(locations,
|
||||
(current, next) => data.get(current, response, log,
|
||||
(err: any, readable: http.IncomingMessage) => {
|
||||
// NB: readable is of IncomingMessage type
|
||||
if (err) {
|
||||
log.error('failed to get object', {
|
||||
error: err,
|
||||
method: 'retrieveData',
|
||||
});
|
||||
_destroyResponse();
|
||||
return next(err);
|
||||
}
|
||||
// response.isclosed is set by the S3 server. Might happen if
|
||||
// the S3-client closes the connection before the first request
|
||||
// to the backend is started.
|
||||
// @ts-expect-error
|
||||
if (responseDestroyed || response.isclosed) {
|
||||
log.debug(
|
||||
'response destroyed before readable could stream');
|
||||
readable.destroy();
|
||||
const responseErr = new Error();
|
||||
// @ts-ignore
|
||||
responseErr.code = 'ResponseError';
|
||||
responseErr.message = 'response closed by client request before all data sent';
|
||||
return next(responseErr);
|
||||
}
|
||||
// readable stream successfully consumed
|
||||
readable.on('end', () => {
|
||||
currentStream = null;
|
||||
log.debug('readable stream end reached');
|
||||
return next();
|
||||
});
|
||||
// errors on server side with readable stream
|
||||
readable.on('error', err => {
|
||||
log.error('error piping data from source');
|
||||
_destroyResponse();
|
||||
return next(err);
|
||||
});
|
||||
currentStream = readable;
|
||||
return readable.pipe(response, { end: false });
|
||||
}), err => {
|
||||
currentStream = null;
|
||||
if (err) {
|
||||
log.debug('abort response due to error', {
|
||||
const spanOptions = { links: [{ context: apiSpan.spanContext() }] };
|
||||
return tracer.startActiveSpan('Getting Data using sproxyd', spanOptions, ctx, dataSpan => {
|
||||
dataSpan.setAttribute('code.function', 'retrieveData');
|
||||
dataSpan.setAttribute('code.filepath', 'lib/s3routes/routesUtils.js');
|
||||
dataSpan.setAttribute('code.lineno', 400);
|
||||
return eachSeries(locations,
|
||||
(current, next) => data.get(current, response, log,
|
||||
(err: any, readable: http.IncomingMessage) => {
|
||||
// NB: readable is of IncomingMessage type
|
||||
if (err) {
|
||||
log.error('failed to get object', {
|
||||
error: err,
|
||||
method: 'retrieveData',
|
||||
});
|
||||
_destroyResponse();
|
||||
return next(err);
|
||||
}
|
||||
// response.isclosed is set by the S3 server. Might happen if
|
||||
// the S3-client closes the connection before the first request
|
||||
// to the backend is started.
|
||||
// @ts-expect-error
|
||||
error: err.code, errMsg: err.message });
|
||||
}
|
||||
// call end for all cases (error/success) per node.js docs
|
||||
// recommendation
|
||||
response.end();
|
||||
},
|
||||
);
|
||||
if (responseDestroyed || response.isclosed) {
|
||||
log.debug(
|
||||
'response destroyed before readable could stream');
|
||||
readable.destroy();
|
||||
const responseErr = new Error();
|
||||
// @ts-ignore
|
||||
responseErr.code = 'ResponseError';
|
||||
responseErr.message = 'response closed by client request before all data sent';
|
||||
return next(responseErr);
|
||||
}
|
||||
// readable stream successfully consumed
|
||||
readable.on('end', () => {
|
||||
currentStream = null;
|
||||
dataSpan.end();
|
||||
if (apiSpan) {
|
||||
apiSpan.addEvent('Readable stream successfully consumed');
|
||||
apiSpan.end();
|
||||
}
|
||||
log.debug('readable stream end reached');
|
||||
return next();
|
||||
});
|
||||
// errors on server side with readable stream
|
||||
readable.on('error', err => {
|
||||
dataSpan.end();
|
||||
if (apiSpan) {
|
||||
apiSpan.addEvent('Unable to stream object from Sproxyd');
|
||||
apiSpan.end();
|
||||
}
|
||||
log.error('error piping data from source');
|
||||
_destroyResponse();
|
||||
return process.nextTick(() => {
|
||||
callApiMethodSpan.end();
|
||||
return next(err);
|
||||
});
|
||||
});
|
||||
currentStream = readable;
|
||||
return readable.pipe(response, { end: false });
|
||||
}), err => {
|
||||
currentStream = null;
|
||||
if (err) {
|
||||
log.debug('abort response due to error', {
|
||||
// @ts-expect-error
|
||||
error: err.code, errMsg: err.message });
|
||||
}
|
||||
// call end for all cases (error/success) per node.js docs
|
||||
// recommendation
|
||||
response.end();
|
||||
},
|
||||
);
|
||||
})
|
||||
}
|
||||
|
||||
function _responseBody(
|
||||
|
@ -605,7 +639,11 @@ export function responseStreamData(
|
|||
response: http.ServerResponse,
|
||||
range: [number, number] | undefined,
|
||||
log: RequestLogger,
|
||||
apiSpan?: any,
|
||||
callApiMethodSpan?: any,
|
||||
tracer?: any,
|
||||
) {
|
||||
apiSpan.addEvent('Getting Data from Sproxyd');
|
||||
if (errCode && !response.headersSent) {
|
||||
return XMLResponseBackend.errorResponse(errCode, response, log,
|
||||
resHeaders);
|
||||
|
@ -640,12 +678,14 @@ export function responseStreamData(
|
|||
}
|
||||
response.on('finish', () => {
|
||||
// TODO ARSN-216 Fix logger
|
||||
callApiMethodSpan.addEvent('Sending response to Client');
|
||||
callApiMethodSpan.end();
|
||||
// @ts-expect-error
|
||||
log.end().info('responded with streamed content', {
|
||||
httpCode: response.statusCode,
|
||||
});
|
||||
});
|
||||
return retrieveData(dataLocations, retrieveDataParams, response, log);
|
||||
return retrieveData(dataLocations, retrieveDataParams, response, log, apiSpan, callApiMethodSpan, tracer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
"homepage": "https://github.com/scality/Arsenal#readme",
|
||||
"dependencies": {
|
||||
"@js-sdsl/ordered-set": "^4.4.2",
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"@types/async": "^3.2.12",
|
||||
"@types/utf8": "^3.0.1",
|
||||
"JSONStream": "^1.0.0",
|
||||
|
|
|
@ -1221,6 +1221,11 @@
|
|||
mkdirp "^1.0.4"
|
||||
rimraf "^3.0.2"
|
||||
|
||||
"@opentelemetry/api@^1.9.0":
|
||||
version "1.9.0"
|
||||
resolved "https://registry.yarnpkg.com/@opentelemetry/api/-/api-1.9.0.tgz#d03eba68273dc0f7509e2a3d5cba21eae10379fe"
|
||||
integrity sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==
|
||||
|
||||
"@sideway/address@^4.1.3":
|
||||
version "4.1.4"
|
||||
resolved "https://registry.yarnpkg.com/@sideway/address/-/address-4.1.4.tgz#03dccebc6ea47fdc226f7d3d1ad512955d4783f0"
|
||||
|
|
Loading…
Reference in New Issue