Compare commits

...

40 Commits

Author SHA1 Message Date
Anurag Mittal 70e03f9ecd
final instrumentation 2024-06-17 15:49:52 +02:00
Anurag Mittal bcea25d318
updated Otelstack 2024-06-17 15:18:39 +02:00
Anurag Mittal 4a408af758
done with delete object 2024-06-14 12:18:22 +02:00
Anurag Mittal 2854a4355a
deleteobject API, need to add spans for ObjectDelete method 2024-06-13 18:02:55 +02:00
Anurag Mittal b1e869f330
done with Put object 2024-06-13 16:58:18 +02:00
Anurag Mittal 7a9f40a91e
fixup added package.json 2024-06-13 16:16:20 +02:00
Anurag Mittal 63579e2782
putObject Working fine, need to link to data storage 2024-06-13 15:58:12 +02:00
Anurag Mittal c59c180547
updated logic for putobject in arsenal 2024-06-13 14:35:10 +02:00
Anurag Mittal 4c1680fa57
use service name from env 2024-06-13 12:20:03 +02:00
Anurag Mittal 0ff7357f5f
done with Put object 2024-06-13 09:44:08 +02:00
Anurag Mittal 6df85e9860
need to link API span with method span 2024-06-12 18:56:43 +02:00
Anurag Mittal f3aa959d5d
put object update 2024-06-12 18:17:34 +02:00
Anurag Mittal c17657f7e9
added bucket and request ID to trace tags 2024-06-12 14:47:46 +02:00
Anurag Mittal 2607edcf51
head object and head bucket working 2024-06-12 13:52:08 +02:00
Anurag Mittal c9e6dad068
bucket and object listing ready POC 2024-06-11 18:13:44 +02:00
Anurag Mittal 70b237d674
list bucket complete 2024-06-11 17:51:10 +02:00
Anurag Mittal 285eb6aa5d
updated bucketGet 2024-06-11 17:20:56 +02:00
Anurag Mittal be551f18a4
cleanup of traces, adding events at proper places for cloudserver and vault 2024-06-11 14:44:41 +02:00
Anurag Mittal 1119177c33
removed deleted datadog key 2024-06-11 13:27:19 +02:00
Anurag Mittal e35dc824d7
added honeycomb with latest OTel 2024-06-11 12:14:20 +02:00
Anurag Mittal 2b9339a24b
working Jaeger with latest collector over otlphttp 2024-06-11 11:56:14 +02:00
Anurag Mittal e5b445382f
moved to tempo with otlphttp 2024-06-11 11:50:58 +02:00
Anurag Mittal 198b59ad19
7 traces working with MD and data 2024-06-07 16:06:53 +02:00
Anurag Mittal e093dd2e39
working with 5 events linked 2024-06-07 15:47:47 +02:00
Anurag Mittal d6be7e5dd4
4 + 1 spans 2024-06-07 15:41:24 +02:00
Anurag Mittal 16b54f5a8e
pass around tracer in arsenal 2024-06-07 15:18:04 +02:00
Anurag Mittal 90d00ab3a9
4 span checkpoint for list bucket 2024-06-07 15:17:39 +02:00
Anurag Mittal 08b6b4ad73
finished passing tracer and span to arsenal 2024-06-07 09:31:19 +02:00
Anurag Mittal 5854544c17
Added tracer to cloud server 2024-06-07 04:33:37 +02:00
Anurag Mittal ab3464fc95
successful connection to honeycomb 2024-06-06 18:04:23 +02:00
Anurag Mittal 8ff6256c0c
configured exporter to send traces 2024-06-06 14:58:21 +02:00
Anurag Mittal aa6e1a4ffd
added exporter dependencies 2024-06-06 14:47:41 +02:00
Anurag Mittal 6d70e12230
Otel Collector stack setup complete 2024-06-06 14:33:13 +02:00
Anurag Mittal 99a65122d4
S3C-8896: deploy jaeger and zipkin 2024-06-05 13:41:13 +02:00
Anurag Mittal 16047a430c
S3C-8896: deploy collector with prometheus for metrics 2024-06-05 11:10:23 +02:00
Anurag Mittal ce81935205
S3C-8896 run prometheus and grafana 2024-06-05 10:45:52 +02:00
Anurag Mittal b174d7bfdc
S3C-8896 change service name and version 2024-06-05 10:45:23 +02:00
Anurag Mittal 9eec2b7922
S3C-8896 add OTel resource and semantic convention packages 2024-06-05 10:44:45 +02:00
Anurag Mittal 8ff8794287
S3C-8896: Added based Otel instrumentation 2024-06-05 02:01:13 +02:00
Anurag Mittal fecd2d2371
S3C-8896-added-OTel-packages 2024-06-05 02:00:33 +02:00
21 changed files with 2611 additions and 385 deletions

82
docker-compose.yml Normal file
View File

@ -0,0 +1,82 @@
# endpoints
# prometheus: http://localhost:9090
# grafana: http://localhost:3000
# jaeger: http://localhost:16686
# zipkin: http://localhost:9411
# otel-collector-metrics: http://localhost:8889
# tempo: http://localhost:3200
# version: '3'
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.102.1
restart: always
command:
- --config=/etc/otelcol-contrib/otel-collector.yml
volumes:
- ./docker/collector/otel-collector.yml:/etc/otelcol-contrib/otel-collector.yml
ports:
- "1888:1888" # pprof extension
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
- "13133:13133" # health_check extension
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP http receiver
- "55679:55679" # zpages extension
depends_on:
- jaeger-all-in-one
- zipkin-all-in-one
prometheus:
container_name: prometheus
image: prom/prometheus
restart: always
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
loki:
image: grafana/loki:latest
command: -config.file=/etc/loki/local-config.yaml
ports:
- "3100:3100"
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yml" ]
volumes:
- ./docker/tempo/tempo.yml:/etc/tempo.yml
ports:
- "3200:3200" # tempo
- "4317" # otlp grpc
- "4318" # otlp http
grafana:
container_name: grafana
image: grafana/grafana
volumes:
- ./docker/grafana/grafana-datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
ports:
- "3000:3000"
# Jaeger
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
environment:
- COLLECTOR_OTLP_ENABLED=true
- JAEGER_QUERY_MAX_CLOCK_SKEW_ADJUSTMENT=0s
restart: always
ports:
- "16686:16686"
- "4318"
- "4317"
# Zipkin
zipkin-all-in-one:
image: openzipkin/zipkin:latest
restart: always
ports:
- "9411:9411"

View File

@ -0,0 +1,72 @@
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
grpc:
endpoint: 0.0.0.0:4317
processors:
# batch metrics before sending to reduce API usage
batch:
exporters:
logging:
loglevel: debug
loki:
endpoint: "http://loki:3100/loki/api/v1/push"
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
otlphttp/tempo:
endpoint: "http://tempo:4318"
tls:
insecure: true
otlp:
endpoint: "api.eu1.honeycomb.io:443"
headers:
"x-honeycomb-team": ""
otlphttp/jaeger:
endpoint: "http://jaeger-all-in-one:4318"
tls:
insecure: true
# datadog:
# api:
# key: ""
zipkin:
endpoint: "http://zipkin-all-in-one:9411/api/v2/spans"
format: proto
# https://github.com/open-telemetry/opentelemetry-collector/blob/main/extension/README.md
extensions:
# responsible for responding to health check calls on behalf of the collector.
health_check:
# fetches the collectors performance data
pprof:
# serves as an http endpoint that provides live debugging data about instrumented components.
zpages:
service:
extensions: [health_check, pprof, zpages]
pipelines:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlphttp/tempo,otlphttp/jaeger,otlp,zipkin]
# exporters: [otlphttp/jaeger,zipkin]
logs:
receivers: [otlp]
exporters: [loki]

View File

@ -0,0 +1,47 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
uid: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: false
version: 1
editable: false
jsonData:
httpMethod: GET
- name: Loki
type: loki
uid: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: false
version: 1
editable: false
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: true
version: 1
editable: false
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
tracesToLogsV2:
datasourceUid: loki
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
filterByTraceID: true
filterBySpanID: true
tags: [ { key: 'service.name', value: 'job' } ]

View File

@ -0,0 +1,15 @@
global:
scrape_interval: 10s
evaluation_interval: 10s
scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8889']
# For direct metrics collection from service to Prometheus
# - job_name: 'cloudserver-service-app'
# metrics_path: /metrics
# static_configs:
# - targets: ['host.docker.internal:8002']

46
docker/tempo/tempo.yml Normal file
View File

@ -0,0 +1,46 @@
server:
http_listen_port: 3200
distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
opencensus:
ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /tmp/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
storage:
trace:
backend: local # backend configuration to use
wal:
path: /tmp/tempo/wal # where to store the the wal locally
local:
path: /tmp/tempo/blocks
overrides:
metrics_generator_processors: [service-graphs, span-metrics] # enables metrics generator

72
instrumentation.js Normal file
View File

@ -0,0 +1,72 @@
const opentelemetry = require('@opentelemetry/sdk-node');
const { WebTracerProvider } = require('@opentelemetry/sdk-trace-web');
const { Resource } = require('@opentelemetry/resources');
const { PeriodicExportingMetricReader } = require('@opentelemetry/sdk-metrics');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-proto');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-proto');
const { BatchSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const { IORedisInstrumentation } = require('@opentelemetry/instrumentation-ioredis');
const {
SEMRESATTRS_SERVICE_NAME,
SEMRESATTRS_SERVICE_VERSION,
} = require('@opentelemetry/semantic-conventions');
// Define resource with service name and version
const resource = new Resource({
[SEMRESATTRS_SERVICE_NAME]: process.env.OTEL_SERVICE_NAME || 'cloudserver-service',
[SEMRESATTRS_SERVICE_VERSION]: '7.70.47',
});
// OTLP Trace Exporter configuration
const traceExporter = new OTLPTraceExporter({
url: `http://${process.env.OPENTLEMETRY_COLLECTOR_HOST || 'localhost'}:${process.env.OPENTLEMETRY_COLLECTOR_PORT || 4318}/v1/traces`,
headers: {},
});
// Metric Reader configuration
const metricReader = new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: `http://${process.env.OPENTLEMETRY_COLLECTOR_HOST || 'localhost'}:${process.env.OPENTLEMETRY_COLLECTOR_PORT || 4318}/v1/metrics`,
headers: {},
concurrencyLimit: 1,
}),
});
// Node SDK configuration
const sdk = new opentelemetry.NodeSDK({
traceExporter,
resource,
metricReader,
instrumentations: [
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-fs': {
enabled: false,
},
'@opentelemetry/instrumentation-http': {
responseHook: (span, operations) => {
span.updateName(`${operations.req.protocol} ${operations.req.method} ${operations.req.path.split('&')[0]}`);
},
},
}),
new IORedisInstrumentation({
requestHook: (span, { cmdName, cmdArgs }) => {
span.updateName(`Redis:: ${cmdName.toUpperCase()} cache operation for ${cmdArgs[0].split(':')[0]}`);
},
// see under for available configuration
}),
],
});
// Additional WebTracerProvider configuration
// This will initialize TracerProvider that will let us create a Tracers
const webTracerProvider = new WebTracerProvider({ resource });
const webSpanProcessor = new BatchSpanProcessor(traceExporter);
webTracerProvider.addSpanProcessor(webSpanProcessor);
webTracerProvider.register();
// Start the Node SDK
sdk.start();

View File

@ -1,5 +1,6 @@
const { auth, errors, policies } = require('arsenal'); const { auth, errors, policies } = require('arsenal');
const async = require('async'); const async = require('async');
const opentelemetry = require('@opentelemetry/api');
const bucketDelete = require('./bucketDelete'); const bucketDelete = require('./bucketDelete');
const bucketDeleteCors = require('./bucketDeleteCors'); const bucketDeleteCors = require('./bucketDeleteCors');
@ -75,215 +76,249 @@ auth.setHandler(vault);
/* eslint-disable no-param-reassign */ /* eslint-disable no-param-reassign */
const api = { const api = {
callApiMethod(apiMethod, request, response, log, callback) { callApiMethod(apiMethod, request, response, log, callback, tracer) {
// Attach the apiMethod method to the request, so it can used by monitoring in the server return tracer.startActiveSpan('Using Cloudserver to processing API related operations', methodSpan => {
// eslint-disable-next-line no-param-reassign methodSpan.addEvent('Cloudserver::callApiMethod() Processing API related operations');
request.apiMethod = apiMethod; methodSpan.setAttribute('code.function', 'callApiMethod()');
methodSpan.setAttribute('code.filepath', 'lib/api/api.js');
methodSpan.setAttribute('code.lineno', 84);
// Attach the apiMethod method to the request, so it can used by monitoring in the server
// eslint-disable-next-line no-param-reassign
request.apiMethod = apiMethod;
const actionLog = monitoringMap[apiMethod]; const actionLog = monitoringMap[apiMethod];
if (!actionLog && if (!actionLog &&
apiMethod !== 'websiteGet' && apiMethod !== 'websiteGet' &&
apiMethod !== 'websiteHead' && apiMethod !== 'websiteHead' &&
apiMethod !== 'corsPreflight') { apiMethod !== 'corsPreflight') {
log.error('callApiMethod(): No actionLog for this api method', { log.error('callApiMethod(): No actionLog for this api method', {
apiMethod, apiMethod,
}); });
} }
log.addDefaultFields({
service: 's3',
action: actionLog,
bucketName: request.bucketName,
});
if (request.objectKey) {
log.addDefaultFields({ log.addDefaultFields({
objectKey: request.objectKey, service: 's3',
action: actionLog,
bucketName: request.bucketName,
}); });
} if (request.objectKey) {
let returnTagCount = true; log.addDefaultFields({
objectKey: request.objectKey,
const validationRes = validateQueryAndHeaders(request, log); });
if (validationRes.error) { }
log.debug('request query / header validation failed', {
error: validationRes.error,
method: 'api.callApiMethod',
});
return process.nextTick(callback, validationRes.error);
}
// no need to check auth on website or cors preflight requests
if (apiMethod === 'websiteGet' || apiMethod === 'websiteHead' ||
apiMethod === 'corsPreflight') {
request.actionImplicitDenies = false;
return this[apiMethod](request, log, callback);
}
const { sourceBucket, sourceObject, sourceVersionId, parsingError } =
parseCopySource(apiMethod, request.headers['x-amz-copy-source']);
if (parsingError) {
log.debug('error parsing copy source', {
error: parsingError,
});
return process.nextTick(callback, parsingError);
}
const { httpHeadersSizeError } = checkHttpHeadersSize(request.headers);
if (httpHeadersSizeError) {
log.debug('http header size limit exceeded', {
error: httpHeadersSizeError,
});
return process.nextTick(callback, httpHeadersSizeError);
}
const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);
// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
// eslint-disable-next-line no-param-reassign
request.apiMethods = apiMethods;
function checkAuthResults(authResults) {
let returnTagCount = true; let returnTagCount = true;
const isImplicitDeny = {};
let isOnlyImplicitDeny = true; const validationRes = validateQueryAndHeaders(request, log);
if (apiMethod === 'objectGet') { if (validationRes.error) {
// first item checks s3:GetObject(Version) action log.debug('request query / header validation failed', {
if (!authResults[0].isAllowed && !authResults[0].isImplicit) { error: validationRes.error,
log.trace('get object authorization denial from Vault'); method: 'api.callApiMethod',
return errors.AccessDenied; });
} return process.nextTick(callback, validationRes.error);
// TODO add support for returnTagCount in the bucket policy }
// checks
isImplicitDeny[authResults[0].action] = authResults[0].isImplicit; // no need to check auth on website or cors preflight requests
// second item checks s3:GetObject(Version)Tagging action if (apiMethod === 'websiteGet' || apiMethod === 'websiteHead' ||
if (!authResults[1].isAllowed) { apiMethod === 'corsPreflight') {
log.trace('get tagging authorization denial ' + request.actionImplicitDenies = false;
'from Vault'); return this[apiMethod](request, log, callback);
returnTagCount = false; }
}
} else { const { sourceBucket, sourceObject, sourceVersionId, parsingError } =
for (let i = 0; i < authResults.length; i++) { parseCopySource(apiMethod, request.headers['x-amz-copy-source']);
isImplicitDeny[authResults[i].action] = true; if (parsingError) {
if (!authResults[i].isAllowed && !authResults[i].isImplicit) { log.debug('error parsing copy source', {
// Any explicit deny rejects the current API call error: parsingError,
log.trace('authorization denial from Vault'); });
return process.nextTick(callback, parsingError);
}
const { httpHeadersSizeError } = checkHttpHeadersSize(request.headers);
if (httpHeadersSizeError) {
log.debug('http header size limit exceeded', {
error: httpHeadersSizeError,
});
return process.nextTick(callback, httpHeadersSizeError);
}
const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);
// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
// eslint-disable-next-line no-param-reassign
request.apiMethods = apiMethods;
function checkAuthResults(authResults) {
let returnTagCount = true;
const isImplicitDeny = {};
let isOnlyImplicitDeny = true;
if (apiMethod === 'objectGet') {
// first item checks s3:GetObject(Version) action
if (!authResults[0].isAllowed && !authResults[0].isImplicit) {
log.trace('get object authorization denial from Vault');
return errors.AccessDenied; return errors.AccessDenied;
} }
if (authResults[i].isAllowed) { // TODO add support for returnTagCount in the bucket policy
// If the action is allowed, the result is not implicit // checks
// Deny. isImplicitDeny[authResults[0].action] = authResults[0].isImplicit;
isImplicitDeny[authResults[i].action] = false; // second item checks s3:GetObject(Version)Tagging action
isOnlyImplicitDeny = false; if (!authResults[1].isAllowed) {
log.trace('get tagging authorization denial ' +
'from Vault');
returnTagCount = false;
}
} else {
for (let i = 0; i < authResults.length; i++) {
isImplicitDeny[authResults[i].action] = true;
if (!authResults[i].isAllowed && !authResults[i].isImplicit) {
// Any explicit deny rejects the current API call
log.trace('authorization denial from Vault');
return errors.AccessDenied;
}
if (authResults[i].isAllowed) {
// If the action is allowed, the result is not implicit
// Deny.
isImplicitDeny[authResults[i].action] = false;
isOnlyImplicitDeny = false;
}
} }
} }
// These two APIs cannot use ACLs or Bucket Policies, hence, any
// implicit deny from vault must be treated as an explicit deny.
if ((apiMethod === 'bucketPut' || apiMethod === 'serviceGet') && isOnlyImplicitDeny) {
return errors.AccessDenied;
}
return { returnTagCount, isImplicitDeny };
} }
// These two APIs cannot use ACLs or Bucket Policies, hence, any
// implicit deny from vault must be treated as an explicit deny.
if ((apiMethod === 'bucketPut' || apiMethod === 'serviceGet') && isOnlyImplicitDeny) {
return errors.AccessDenied;
}
return { returnTagCount, isImplicitDeny };
}
return async.waterfall([ return async.waterfall([
next => auth.server.doAuth( next => tracer.startActiveSpan('Authentication of user against IAM', authUserSpan => {
request, log, (err, userInfo, authorizationResults, streamingV4Params) => { authUserSpan.setAttribute('code.function', 'auth.server.doAuth()');
if (err) { authUserSpan.setAttribute('code.filepath', 'lib/api/api.js');
log.trace('authentication error', { error: err }); authUserSpan.setAttribute('code.lineno', 197);
return next(err); return next(null, authUserSpan);
}),
(authSpan, next) => auth.server.doAuth(
request, log, (err, userInfo, authorizationResults, streamingV4Params) => {
if (err) {
log.trace('authentication error', { error: err });
authSpan.end();
return next(err);
}
const authNames = { accountName: userInfo.getAccountDisplayName() };
authSpan.setAttribute('user.accountName', authNames.accountName);
if (authNames.userName) {
authSpan.setAttribute('user.userName', authNames.userName);
}
authSpan.end();
return next(null, userInfo, authorizationResults, streamingV4Params);
}, 's3', requestContexts),
(userInfo, authorizationResults, streamingV4Params, next) => {
const authNames = { accountName: userInfo.getAccountDisplayName() };
if (userInfo.isRequesterAnIAMUser()) {
authNames.userName = userInfo.getIAMdisplayName();
} }
return next(null, userInfo, authorizationResults, streamingV4Params); log.addDefaultFields(authNames);
}, 's3', requestContexts), if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
(userInfo, authorizationResults, streamingV4Params, next) => { return next(null, userInfo, authorizationResults, streamingV4Params);
const authNames = { accountName: userInfo.getAccountDisplayName() };
if (userInfo.isRequesterAnIAMUser()) {
authNames.userName = userInfo.getIAMdisplayName();
}
log.addDefaultFields(authNames);
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
return next(null, userInfo, authorizationResults, streamingV4Params);
}
// issue 100 Continue to the client
writeContinue(request, response);
const MAX_POST_LENGTH = request.method === 'POST' ?
1024 * 1024 : 1024 * 1024 / 2; // 1 MB or 512 KB
const post = [];
let postLength = 0;
request.on('data', chunk => {
postLength += chunk.length;
// Sanity check on post length
if (postLength <= MAX_POST_LENGTH) {
post.push(chunk);
} }
}); // issue 100 Continue to the client
writeContinue(request, response);
request.on('error', err => { const MAX_POST_LENGTH = request.method === 'POST' ?
log.trace('error receiving request', { 1024 * 1024 : 1024 * 1024 / 2; // 1 MB or 512 KB
error: err, const post = [];
let postLength = 0;
request.on('data', chunk => {
postLength += chunk.length;
// Sanity check on post length
if (postLength <= MAX_POST_LENGTH) {
post.push(chunk);
}
});
request.on('error', err => {
log.trace('error receiving request', {
error: err,
});
return next(errors.InternalError);
}); });
return next(errors.InternalError);
});
request.on('end', () => { request.on('end', () => {
if (postLength > MAX_POST_LENGTH) { if (postLength > MAX_POST_LENGTH) {
log.error('body length is too long for request type', log.error('body length is too long for request type',
{ postLength }); { postLength });
return next(errors.InvalidRequest); return next(errors.InvalidRequest);
} }
// Convert array of post buffers into one string // Convert array of post buffers into one string
request.post = Buffer.concat(post, postLength).toString(); request.post = Buffer.concat(post, postLength).toString();
return next(null, userInfo, authorizationResults, streamingV4Params); return next(null, userInfo, authorizationResults, streamingV4Params);
}); });
return undefined; return undefined;
},
// Tag condition keys require information from CloudServer for evaluation
(userInfo, authorizationResults, streamingV4Params, next) => tagConditionKeyAuth(
authorizationResults,
request,
requestContexts,
apiMethod,
log,
(err, authResultsWithTags) => {
if (err) {
log.trace('tag authentication error', { error: err });
return next(err);
}
return next(null, userInfo, authResultsWithTags, streamingV4Params);
}, },
), // Tag condition keys require information from CloudServer for evaluation
], (err, userInfo, authorizationResults, streamingV4Params) => { (userInfo, authorizationResults, streamingV4Params, next) => tagConditionKeyAuth(
if (err) { authorizationResults,
return callback(err); request,
} requestContexts,
if (authorizationResults) { apiMethod,
const checkedResults = checkAuthResults(authorizationResults); log,
if (checkedResults instanceof Error) { (err, authResultsWithTags) => {
return callback(checkedResults); methodSpan.addEvent('Authentication of user completed');
if (err) {
log.trace('tag authentication error', { error: err });
return next(err);
}
return next(null, userInfo, authResultsWithTags, streamingV4Params);
},
),
], (err, userInfo, authorizationResults, streamingV4Params) => {
if (err) {
return callback(err);
} }
returnTagCount = checkedResults.returnTagCount; if (authorizationResults) {
request.actionImplicitDenies = checkedResults.isImplicitDeny; const checkedResults = checkAuthResults(authorizationResults);
} else { if (checkedResults instanceof Error) {
// create an object of keys apiMethods with all values to false: return callback(checkedResults);
// for backward compatibility, all apiMethods are allowed by default }
// thus it is explicitly allowed, so implicit deny is false returnTagCount = checkedResults.returnTagCount;
request.actionImplicitDenies = apiMethods.reduce((acc, curr) => { request.actionImplicitDenies = checkedResults.isImplicitDeny;
acc[curr] = false; } else {
return acc; // create an object of keys apiMethods with all values to false:
}, {}); // for backward compatibility, all apiMethods are allowed by default
} // thus it is explicitly allowed, so implicit deny is false
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') { request.actionImplicitDenies = apiMethods.reduce((acc, curr) => {
request._response = response; acc[curr] = false;
return this[apiMethod](userInfo, request, streamingV4Params, return acc;
log, callback, authorizationResults); }, {});
} }
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') { const ctx = opentelemetry.trace.setSpan(
return this[apiMethod](userInfo, request, sourceBucket, opentelemetry.context.active(),
sourceObject, sourceVersionId, log, callback); methodSpan,
} );
if (apiMethod === 'objectGet') { const apiSpan = tracer.startSpan(`API operation for ${apiMethod}`, undefined, ctx);
return this[apiMethod](userInfo, request, returnTagCount, log, callback); if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
} request._response = response;
return this[apiMethod](userInfo, request, log, callback); return this[apiMethod](userInfo, request, streamingV4Params,
log, callback, authorizationResults, apiSpan, methodSpan);
}
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') {
return this[apiMethod](userInfo, request, sourceBucket,
sourceObject, sourceVersionId, log, callback, apiSpan);
}
if (apiMethod === 'objectGet') {
return this[apiMethod](userInfo, request, returnTagCount, log, callback, apiSpan, methodSpan);
}
return this[apiMethod](userInfo, request, log, (err, res, cosrsHeaders) => {
methodSpan.addEvent('Cloudserver::callApiMethod() API operation completed, sending response to client');
apiSpan.end();
return process.nextTick(() => {
methodSpan.end();
if (err) {
return callback(err);
}
return callback(null, res, cosrsHeaders);
});
}, apiSpan);
});
}); });
}, },
bucketDelete, bucketDelete,

View File

@ -1,6 +1,7 @@
const async = require('async'); const async = require('async');
const { errors, s3middleware } = require('arsenal'); const { errors, s3middleware } = require('arsenal');
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders; const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
const opentelemetry = require('@opentelemetry/api');
const constants = require('../../../../constants'); const constants = require('../../../../constants');
const { data } = require('../../../data/wrapper'); const { data } = require('../../../data/wrapper');
@ -46,8 +47,30 @@ function _checkAndApplyZenkoMD(metaHeaders, request, isDeleteMarker) {
} }
function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle, function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
metadataStoreParams, dataToDelete, log, requestMethod, callback) { metadataStoreParams, dataToDelete, log, requestMethod, callback, ctx, tracer) {
services.metadataStoreObject(bucketName, dataGetInfo, if (tracer) {
return tracer.startActiveSpan('Updating metadata', undefined, ctx, metadataSpan => {
return services.metadataStoreObject(bucketName, dataGetInfo,
cipherBundle, metadataStoreParams, (err, result) => {
if (err) {
return callback(err);
}
if (dataToDelete) {
const newDataStoreName = Array.isArray(dataGetInfo) ?
dataGetInfo[0].dataStoreName : null;
return tracer.startActiveSpan('Deleting relevant metadadata', undefined, ctx, deleteMetadataSpan => {
return data.batchDelete(dataToDelete, requestMethod,
newDataStoreName, log, err => {
deleteMetadataSpan.end();
return callback(err, result, metadataSpan);
});
});
}
return callback(null, result);
});
});
}
return services.metadataStoreObject(bucketName, dataGetInfo,
cipherBundle, metadataStoreParams, (err, result) => { cipherBundle, metadataStoreParams, (err, result) => {
if (err) { if (err) {
return callback(err); return callback(err);
@ -85,7 +108,8 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
*/ */
function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo, function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
canonicalID, cipherBundle, request, isDeleteMarker, streamingV4Params, canonicalID, cipherBundle, request, isDeleteMarker, streamingV4Params,
overheadField, log, callback) { overheadField, log, callback, apiSpan, callAPIMethodSpan) {
apiSpan.addEvent('preparing Object data and Metadata create object');
const size = isDeleteMarker ? 0 : request.parsedContentLength; const size = isDeleteMarker ? 0 : request.parsedContentLength;
// although the request method may actually be 'DELETE' if creating a // although the request method may actually be 'DELETE' if creating a
// delete marker, for our purposes we consider this to be a 'PUT' // delete marker, for our purposes we consider this to be a 'PUT'
@ -203,17 +227,29 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
/* eslint-disable camelcase */ /* eslint-disable camelcase */
const dontSkipBackend = externalBackends; const dontSkipBackend = externalBackends;
/* eslint-enable camelcase */ /* eslint-enable camelcase */
apiSpan.addEvent('object data and metadata prepared, starting storage procedure');
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
apiSpan,
);
return async.waterfall([ return async.waterfall([
function storeData(next) { function storeData(next) {
// const datastoreSpan = tracer.startSpan('Storing Data using sproxyd', undefined, ctx);
if (size === 0 && !dontSkipBackend[locationType]) { if (size === 0 && !dontSkipBackend[locationType]) {
metadataStoreParams.contentMD5 = constants.emptyFileMd5; metadataStoreParams.contentMD5 = constants.emptyFileMd5;
return next(null, null, null); return next(null, null, null);
} }
return dataStore(objectKeyContext, cipherBundle, request, size, return dataStore(objectKeyContext, cipherBundle, request, size,
streamingV4Params, backendInfo, log, next); streamingV4Params, backendInfo, log, (err, dataGetInfo, calculatedHash, dataStoreSpan) => {
if (dataStoreSpan) {
dataStoreSpan.end();
}
return next(err, dataGetInfo, calculatedHash);
}, ctx, tracer);
}, },
function processDataResult(dataGetInfo, calculatedHash, next) { function processDataResult(dataGetInfo, calculatedHash, next) {
const processDataSpan = tracer.startSpan('Processing data stored for generating MD', undefined, ctx);
if (dataGetInfo === null || dataGetInfo === undefined) { if (dataGetInfo === null || dataGetInfo === undefined) {
return next(null, null); return next(null, null);
} }
@ -234,7 +270,10 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
cipherBundle.cipheredDataKey; cipherBundle.cipheredDataKey;
} }
metadataStoreParams.contentMD5 = calculatedHash; metadataStoreParams.contentMD5 = calculatedHash;
return next(null, dataGetInfoArr); return process.nextTick(() => {
processDataSpan.end();
return next(null, dataGetInfoArr);
});
}, },
function getVersioningInfo(infoArr, next) { function getVersioningInfo(infoArr, next) {
return versioningPreprocessing(bucketName, bucketMD, return versioningPreprocessing(bucketName, bucketMD,
@ -262,7 +301,12 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
} }
return _storeInMDandDeleteData(bucketName, infoArr, return _storeInMDandDeleteData(bucketName, infoArr,
cipherBundle, metadataStoreParams, cipherBundle, metadataStoreParams,
options.dataToDelete, log, requestMethod, next); options.dataToDelete, log, requestMethod, (err, result, metadataSpan) => {
if (metadataSpan) {
metadataSpan.end();
}
return next(err, result);
}, ctx, tracer);
}, },
], callback); ], callback);
} }

View File

@ -16,7 +16,7 @@ const { prepareStream } = require('./prepareStream');
* @return {function} - calls callback with arguments: * @return {function} - calls callback with arguments:
* error, dataRetrievalInfo, and completedHash (if any) * error, dataRetrievalInfo, and completedHash (if any)
*/ */
function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) { function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb, dataStoreSpan) {
const contentMD5 = stream.contentMD5; const contentMD5 = stream.contentMD5;
const completedHash = hashedStream.completedHash; const completedHash = hashedStream.completedHash;
if (contentMD5 && completedHash && contentMD5 !== completedHash) { if (contentMD5 && completedHash && contentMD5 !== completedHash) {
@ -36,7 +36,7 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
return cb(errors.BadDigest); return cb(errors.BadDigest);
}); });
} }
return cb(null, dataRetrievalInfo, completedHash); return cb(null, dataRetrievalInfo, completedHash, dataStoreSpan);
} }
/** /**
@ -56,12 +56,37 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
* @return {undefined} * @return {undefined}
*/ */
function dataStore(objectContext, cipherBundle, stream, size, function dataStore(objectContext, cipherBundle, stream, size,
streamingV4Params, backendInfo, log, cb) { streamingV4Params, backendInfo, log, cb, ctx, tracer) {
const cbOnce = jsutil.once(cb); const cbOnce = jsutil.once(cb);
const dataStream = prepareStream(stream, streamingV4Params, log, cbOnce); const dataStream = prepareStream(stream, streamingV4Params, log, cbOnce);
if (!dataStream) { if (!dataStream) {
return process.nextTick(() => cb(errors.InvalidArgument)); return process.nextTick(() => cb(errors.InvalidArgument));
} }
if (tracer) {
return tracer.startActiveSpan('Storing Data using sproxyd', undefined, ctx, dataStoreSpan => {
return data.put(
cipherBundle, dataStream, size, objectContext, backendInfo, log,
(err, dataRetrievalInfo, hashedStream) => {
if (err) {
log.error('error in datastore', {
error: err,
});
return cbOnce(err);
}
if (!dataRetrievalInfo) {
log.fatal('data put returned neither an error nor a key', {
method: 'storeObject::dataStore',
});
return cbOnce(errors.InternalError);
}
log.trace('dataStore: backend stored key', {
dataRetrievalInfo,
});
return checkHashMatchMD5(stream, hashedStream,
dataRetrievalInfo, log, cbOnce, dataStoreSpan);
});
});
}
return data.put( return data.put(
cipherBundle, dataStream, size, objectContext, backendInfo, log, cipherBundle, dataStream, size, objectContext, backendInfo, log,
(err, dataRetrievalInfo, hashedStream) => { (err, dataRetrievalInfo, hashedStream) => {

View File

@ -1,5 +1,6 @@
const querystring = require('querystring'); const querystring = require('querystring');
const { errors, versioning, s3middleware } = require('arsenal'); const { errors, versioning, s3middleware } = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const constants = require('../../constants'); const constants = require('../../constants');
const services = require('../services'); const services = require('../services');
@ -261,7 +262,12 @@ function processMasterVersions(bucketName, listParams, list) {
} }
function handleResult(listParams, requestMaxKeys, encoding, authInfo, function handleResult(listParams, requestMaxKeys, encoding, authInfo,
bucketName, list, corsHeaders, log, callback) { bucketName, list, corsHeaders, log, callback, traceContext) {
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
const handleResultSpan = tracer.startSpan('Preparing result', undefined, traceContext);
handleResultSpan.setAttribute('code.function', 'handleResult()');
handleResultSpan.setAttribute('code.filepath', 'lib/api/bucketGet.js');
handleResultSpan.setAttribute('code.lineno', 270);
// eslint-disable-next-line no-param-reassign // eslint-disable-next-line no-param-reassign
listParams.maxKeys = requestMaxKeys; listParams.maxKeys = requestMaxKeys;
// eslint-disable-next-line no-param-reassign // eslint-disable-next-line no-param-reassign
@ -275,6 +281,7 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
} }
pushMetric('listBucket', log, { authInfo, bucket: bucketName }); pushMetric('listBucket', log, { authInfo, bucket: bucketName });
monitoring.promMetrics('GET', bucketName, '200', 'listBucket'); monitoring.promMetrics('GET', bucketName, '200', 'listBucket');
handleResultSpan.end();
return callback(null, res, corsHeaders); return callback(null, res, corsHeaders);
} }
@ -288,7 +295,16 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
* with either error code or xml response body * with either error code or xml response body
* @return {undefined} * @return {undefined}
*/ */
function bucketGet(authInfo, request, log, callback) { function bucketGet(authInfo, request, log, callback, parentSpan) {
parentSpan.addEvent('Cloudserver::bucketGet() processing bucketGet request');
parentSpan.setAttribute('code.function', 'bucketGet()');
parentSpan.setAttribute('code.filepath', 'lib/api/bucketGet.js');
parentSpan.setAttribute('code.lineno', 299);
// const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
parentSpan,
);
const params = request.query; const params = request.query;
const bucketName = request.bucketName; const bucketName = request.bucketName;
const v2 = params['list-type']; const v2 = params['list-type'];
@ -350,7 +366,7 @@ function bucketGet(authInfo, request, log, callback) {
} else { } else {
listParams.marker = params.marker; listParams.marker = params.marker;
} }
// parentSpan.addEvent('Cloudserver::bucketGet() validating Metadata');
standardMetadataValidateBucket(metadataValParams, request.actionImplicitDenies, log, (err, bucket) => { standardMetadataValidateBucket(metadataValParams, request.actionImplicitDenies, log, (err, bucket) => {
const corsHeaders = collectCorsHeaders(request.headers.origin, const corsHeaders = collectCorsHeaders(request.headers.origin,
request.method, bucket); request.method, bucket);
@ -367,7 +383,9 @@ function bucketGet(authInfo, request, log, callback) {
listParams.versionIdMarker = params['version-id-marker'] ? listParams.versionIdMarker = params['version-id-marker'] ?
versionIdUtils.decode(params['version-id-marker']) : undefined; versionIdUtils.decode(params['version-id-marker']) : undefined;
} }
// parentSpan.addEvent('Cloudserver::bucketGet() Getting object listing');
if (!requestMaxKeys) { if (!requestMaxKeys) {
parentSpan.addEvent('Cloudserver::bucketGet() Empty bucket');
const emptyList = { const emptyList = {
CommonPrefixes: [], CommonPrefixes: [],
Contents: [], Contents: [],
@ -375,7 +393,7 @@ function bucketGet(authInfo, request, log, callback) {
IsTruncated: false, IsTruncated: false,
}; };
return handleResult(listParams, requestMaxKeys, encoding, authInfo, return handleResult(listParams, requestMaxKeys, encoding, authInfo,
bucketName, emptyList, corsHeaders, log, callback); bucketName, emptyList, corsHeaders, log, callback, ctx);
} }
return services.getObjectListing(bucketName, listParams, log, return services.getObjectListing(bucketName, listParams, log,
(err, list) => { (err, list) => {
@ -386,9 +404,9 @@ function bucketGet(authInfo, request, log, callback) {
return callback(err, null, corsHeaders); return callback(err, null, corsHeaders);
} }
return handleResult(listParams, requestMaxKeys, encoding, authInfo, return handleResult(listParams, requestMaxKeys, encoding, authInfo,
bucketName, list, corsHeaders, log, callback); bucketName, list, corsHeaders, log, callback, ctx);
}); }, ctx);
}); }, ctx);
return undefined; return undefined;
} }

View File

@ -1,5 +1,6 @@
const collectCorsHeaders = require('../utilities/collectCorsHeaders'); const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const { standardMetadataValidateBucket } = require('../metadata/metadataUtils'); const { standardMetadataValidateBucket } = require('../metadata/metadataUtils');
const opentelemetry = require('@opentelemetry/api');
const { pushMetric } = require('../utapi/utilities'); const { pushMetric } = require('../utapi/utilities');
const monitoring = require('../utilities/metrics'); const monitoring = require('../utilities/metrics');
@ -13,7 +14,15 @@ const monitoring = require('../utilities/metrics');
* with either error code or success * with either error code or success
* @return {undefined} * @return {undefined}
*/ */
function bucketHead(authInfo, request, log, callback) { function bucketHead(authInfo, request, log, callback, parentSpan) {
parentSpan.addEvent('Cloudserver::bucketGet() processing Head Bucket request');
parentSpan.setAttribute('code.function', 'bucketHead()');
parentSpan.setAttribute('code.filepath', 'lib/api/bucketHead.js');
parentSpan.setAttribute('code.lineno', 20);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
parentSpan,
);
log.debug('processing request', { method: 'bucketHead' }); log.debug('processing request', { method: 'bucketHead' });
const bucketName = request.bucketName; const bucketName = request.bucketName;
const metadataValParams = { const metadataValParams = {
@ -38,7 +47,7 @@ function bucketHead(authInfo, request, log, callback) {
'x-amz-bucket-region': bucket.getLocationConstraint(), 'x-amz-bucket-region': bucket.getLocationConstraint(),
}; };
return callback(null, Object.assign(corsHeaders, headers)); return callback(null, Object.assign(corsHeaders, headers));
}); }, ctx);
} }
module.exports = bucketHead; module.exports = bucketHead;

View File

@ -1,6 +1,7 @@
/* eslint-disable indent */ /* eslint-disable indent */
const async = require('async'); const async = require('async');
const { errors, versioning } = require('arsenal'); const { errors, versioning } = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const collectCorsHeaders = require('../utilities/collectCorsHeaders'); const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const services = require('../services'); const services = require('../services');
@ -28,7 +29,16 @@ const { overheadField } = require('../../constants');
* @param {function} cb - final cb to call with the result and response headers * @param {function} cb - final cb to call with the result and response headers
* @return {undefined} * @return {undefined}
*/ */
function objectDelete(authInfo, request, log, cb) { function objectDelete(authInfo, request, log, cb, apiSpan) {
apiSpan.addEvent('Cloudserver::objectGet() processing objectPut request');
apiSpan.setAttribute('code.function', 'objectPut()');
apiSpan.setAttribute('code.filepath', 'lib/api/objectPut.js');
apiSpan.setAttribute('code.lineno', 50);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
apiSpan,
);
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
log.debug('processing request', { method: 'objectDelete' }); log.debug('processing request', { method: 'objectDelete' });
if (authInfo.isRequesterPublicUser()) { if (authInfo.isRequesterPublicUser()) {
log.debug('operation not available for public user'); log.debug('operation not available for public user');
@ -97,9 +107,10 @@ function objectDelete(authInfo, request, log, cb) {
}); });
} }
return next(null, bucketMD, objMD); return next(null, bucketMD, objMD);
}); }, ctx);
}, },
function checkGovernanceBypassHeader(bucketMD, objectMD, next) { function checkGovernanceBypassHeader(bucketMD, objectMD, next) {
apiSpan.addEvent('Check governance bypass header');
// AWS only returns an object lock error if a version id // AWS only returns an object lock error if a version id
// is specified, else continue to create a delete marker // is specified, else continue to create a delete marker
if (!reqVersionId) { if (!reqVersionId) {
@ -112,12 +123,15 @@ function objectDelete(authInfo, request, log, cb) {
log.debug('user does not have BypassGovernanceRetention and object is locked'); log.debug('user does not have BypassGovernanceRetention and object is locked');
return next(err, bucketMD); return next(err, bucketMD);
} }
apiSpan.addEvent('Checked governance bypass header');
return next(null, hasGovernanceBypass, bucketMD, objectMD); return next(null, hasGovernanceBypass, bucketMD, objectMD);
}); });
} }
apiSpan.addEvent('Checked governance bypass header');
return next(null, hasGovernanceBypass, bucketMD, objectMD); return next(null, hasGovernanceBypass, bucketMD, objectMD);
}, },
function evaluateObjectLockPolicy(hasGovernanceBypass, bucketMD, objectMD, next) { function evaluateObjectLockPolicy(hasGovernanceBypass, bucketMD, objectMD, next) {
apiSpan.addEvent('Evaluate object lock policy');
// AWS only returns an object lock error if a version id // AWS only returns an object lock error if a version id
// is specified, else continue to create a delete marker // is specified, else continue to create a delete marker
if (!reqVersionId) { if (!reqVersionId) {
@ -135,7 +149,7 @@ function objectDelete(authInfo, request, log, cb) {
log.debug('trying to delete locked object'); log.debug('trying to delete locked object');
return next(objectLockedError, bucketMD); return next(objectLockedError, bucketMD);
} }
apiSpan.addEvent('Object lock policy evaluation done');
return next(null, bucketMD, objectMD); return next(null, bucketMD, objectMD);
}, },
function deleteOperation(bucketMD, objectMD, next) { function deleteOperation(bucketMD, objectMD, next) {
@ -161,10 +175,14 @@ function objectDelete(authInfo, request, log, cb) {
if (!_bucketRequiresOplogUpdate(bucketMD)) { if (!_bucketRequiresOplogUpdate(bucketMD)) {
delOptions.doesNotNeedOpogUpdate = true; delOptions.doesNotNeedOpogUpdate = true;
} }
// return tracer.startActiveSpan('Deleting object', undefined, ctx, deleteObjectSpan => services.deleteObject(
// bucketName, objectMD, objectKey, delOptions, false, log, (err, delResult) => {
// deleteObjectSpan.end();
// return next(err, bucketMD, objectMD, delResult, deleteInfo);
// }));
return services.deleteObject(bucketName, objectMD, objectKey, return services.deleteObject(bucketName, objectMD, objectKey,
delOptions, false, log, (err, delResult) => next(err, bucketMD, delOptions, false, log, (err, delResult) => next(err, bucketMD,
objectMD, delResult, deleteInfo)); objectMD, delResult, deleteInfo), ctx);
} }
// putting a new delete marker // putting a new delete marker
deleteInfo.newDeleteMarker = true; deleteInfo.newDeleteMarker = true;
@ -257,6 +275,7 @@ function objectDelete(authInfo, request, log, cb) {
monitoring.promMetrics('DELETE', bucketName, '200', 'deleteObject', monitoring.promMetrics('DELETE', bucketName, '200', 'deleteObject',
Number.parseInt(objectMD['content-length'], 10)); Number.parseInt(objectMD['content-length'], 10));
} }
apiSpan.end();
return cb(err, resHeaders); return cb(err, resHeaders);
}); });
} }

View File

@ -1,5 +1,6 @@
const { errors, s3middleware } = require('arsenal'); const { errors, s3middleware } = require('arsenal');
const { parseRange } = require('arsenal').network.http.utils; const { parseRange } = require('arsenal').network.http.utils;
const opentelemetry = require('@opentelemetry/api');
const { data } = require('../data/wrapper'); const { data } = require('../data/wrapper');
@ -25,7 +26,15 @@ const validateHeaders = s3middleware.validateConditionalHeaders;
* @param {function} callback - callback to function in route * @param {function} callback - callback to function in route
* @return {undefined} * @return {undefined}
*/ */
function objectGet(authInfo, request, returnTagCount, log, callback) { function objectGet(authInfo, request, returnTagCount, log, callback, apiSpan, callAPIMethodSpan) {
apiSpan.addEvent('Cloudserver::objectGet() processing objectGet request');
apiSpan.setAttribute('code.function', 'objectGet()');
apiSpan.setAttribute('code.filepath', 'lib/api/objectGet.js');
apiSpan.setAttribute('code.lineno', 32);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
apiSpan,
);
log.debug('processing request', { method: 'objectGet' }); log.debug('processing request', { method: 'objectGet' });
const bucketName = request.bucketName; const bucketName = request.bucketName;
const objectKey = request.objectKey; const objectKey = request.objectKey;
@ -64,6 +73,7 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
return callback(err, null, corsHeaders); return callback(err, null, corsHeaders);
} }
if (!objMD) { if (!objMD) {
apiSpan.addEvent(versionId ? 'Cloudserver::objectGet() Object version not found' : 'Cloudserver::objectGet() Object not found');
const err = versionId ? errors.NoSuchVersion : errors.NoSuchKey; const err = versionId ? errors.NoSuchVersion : errors.NoSuchKey;
monitoring.promMetrics( monitoring.promMetrics(
'GET', bucketName, err.code, 'getObject'); 'GET', bucketName, err.code, 'getObject');
@ -87,6 +97,7 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
return callback(errors.MethodNotAllowed, null, return callback(errors.MethodNotAllowed, null,
responseMetaHeaders); responseMetaHeaders);
} }
apiSpan.addEvent('Cloudserver::bucketGet() Validating API request headers against object metadata');
const headerValResult = validateHeaders(request.headers, const headerValResult = validateHeaders(request.headers,
objMD['last-modified'], objMD['content-md5']); objMD['last-modified'], objMD['content-md5']);
if (headerValResult.error) { if (headerValResult.error) {
@ -233,7 +244,9 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
dataLocator = setPartRanges(dataLocator, byteRange); dataLocator = setPartRanges(dataLocator, byteRange);
} }
} }
apiSpan.addEvent('Cloudserver::bucketGet() Trying to get data location using head request');
return data.head(dataLocator, log, err => { return data.head(dataLocator, log, err => {
apiSpan.addEvent('Cloudserver::bucketGet() Data location found');
if (err) { if (err) {
log.error('error from external backend checking for ' + log.error('error from external backend checking for ' +
'object existence', { error: err }); 'object existence', { error: err });
@ -253,9 +266,9 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
monitoring.promMetrics('GET', bucketName, '200', 'getObject', monitoring.promMetrics('GET', bucketName, '200', 'getObject',
Number.parseInt(responseMetaHeaders['Content-Length'], 10)); Number.parseInt(responseMetaHeaders['Content-Length'], 10));
return callback(null, dataLocator, responseMetaHeaders, return callback(null, dataLocator, responseMetaHeaders,
byteRange); byteRange, apiSpan, callAPIMethodSpan);
}); });
}); }, ctx);
} }
module.exports = objectGet; module.exports = objectGet;

View File

@ -1,6 +1,7 @@
const { errors, s3middleware } = require('arsenal'); const { errors, s3middleware } = require('arsenal');
const validateHeaders = s3middleware.validateConditionalHeaders; const validateHeaders = s3middleware.validateConditionalHeaders;
const { parseRange } = require('arsenal').network.http.utils; const { parseRange } = require('arsenal').network.http.utils;
const opentelemetry = require('@opentelemetry/api');
const { decodeVersionId } = require('./apiUtils/object/versioning'); const { decodeVersionId } = require('./apiUtils/object/versioning');
const collectCorsHeaders = require('../utilities/collectCorsHeaders'); const collectCorsHeaders = require('../utilities/collectCorsHeaders');
@ -25,7 +26,15 @@ const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
* @return {undefined} * @return {undefined}
* *
*/ */
function objectHead(authInfo, request, log, callback) { function objectHead(authInfo, request, log, callback, parentSpan) {
parentSpan.addEvent('Cloudserver::bucketGet() processing Head Object request');
parentSpan.setAttribute('code.function', 'objectHead()');
parentSpan.setAttribute('code.filepath', 'lib/api/objectHead.js');
parentSpan.setAttribute('code.lineno', 32);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
parentSpan,
);
log.debug('processing request', { method: 'objectHead' }); log.debug('processing request', { method: 'objectHead' });
const bucketName = request.bucketName; const bucketName = request.bucketName;
const objectKey = request.objectKey; const objectKey = request.objectKey;
@ -160,7 +169,7 @@ function objectHead(authInfo, request, log, callback) {
}); });
monitoring.promMetrics('HEAD', bucketName, '200', 'headObject'); monitoring.promMetrics('HEAD', bucketName, '200', 'headObject');
return callback(null, responseHeaders); return callback(null, responseHeaders);
}); }, ctx);
} }
module.exports = objectHead; module.exports = objectHead;

View File

@ -1,5 +1,6 @@
const async = require('async'); const async = require('async');
const { errors, versioning } = require('arsenal'); const { errors, versioning } = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const aclUtils = require('../utilities/aclUtils'); const aclUtils = require('../utilities/aclUtils');
const { cleanUpBucket } = require('./apiUtils/bucket/bucketCreation'); const { cleanUpBucket } = require('./apiUtils/bucket/bucketCreation');
@ -39,9 +40,19 @@ const versionIdUtils = versioning.VersionID;
* (to be used for streaming v4 auth if applicable) * (to be used for streaming v4 auth if applicable)
* @param {object} log - the log request * @param {object} log - the log request
* @param {Function} callback - final callback to call with the result * @param {Function} callback - final callback to call with the result
* @param {object} apiSpan - APi span
* @param {object} methodSpan - method span
* @return {undefined} * @return {undefined}
*/ */
function objectPut(authInfo, request, streamingV4Params, log, callback) { function objectPut(authInfo, request, streamingV4Params, log, callback, authorizationResults, apiSpan, methodSpan) {
apiSpan.addEvent('Cloudserver::objectGet() processing objectPut request');
apiSpan.setAttribute('code.function', 'objectPut()');
apiSpan.setAttribute('code.filepath', 'lib/api/objectPut.js');
apiSpan.setAttribute('code.lineno', 50);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
apiSpan,
);
log.debug('processing request', { method: 'objectPut' }); log.debug('processing request', { method: 'objectPut' });
const { const {
bucketName, bucketName,
@ -101,12 +112,14 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
return async.waterfall([ return async.waterfall([
function handleTransientOrDeleteBuckets(next) { function handleTransientOrDeleteBuckets(next) {
apiSpan.addEvent('Cloudserver:: handleTransientOrDeleteBuckets');
if (bucket.hasTransientFlag() || bucket.hasDeletedFlag()) { if (bucket.hasTransientFlag() || bucket.hasDeletedFlag()) {
return cleanUpBucket(bucket, canonicalID, log, next); return cleanUpBucket(bucket, canonicalID, log, next);
} }
return next(); return next();
}, },
function getSSEConfig(next) { function getSSEConfig(next) {
apiSpan.addEvent('Cloudserver:: handleTransientOrDeleteBuckets');
return getObjectSSEConfiguration(headers, bucket, log, return getObjectSSEConfiguration(headers, bucket, log,
(err, sseConfig) => { (err, sseConfig) => {
if (err) { if (err) {
@ -125,17 +138,20 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
return next(null, null); return next(null, null);
}, },
function objectCreateAndStore(cipherBundle, next) { function objectCreateAndStore(cipherBundle, next) {
apiSpan.addEvent('Cloudserver:: handleTransientOrDeleteBuckets');
const objectLockValidationError const objectLockValidationError
= validateHeaders(bucket, headers, log); = validateHeaders(bucket, headers, log);
if (objectLockValidationError) { if (objectLockValidationError) {
return next(objectLockValidationError); return next(objectLockValidationError);
} }
writeContinue(request, request._response); writeContinue(request, request._response);
apiSpan.addEvent('Cloudserver:: creating and storing object');
return createAndStoreObject(bucketName, return createAndStoreObject(bucketName,
bucket, objectKey, objMD, authInfo, canonicalID, cipherBundle, bucket, objectKey, objMD, authInfo, canonicalID, cipherBundle,
request, false, streamingV4Params, overheadField, log, next); request, false, streamingV4Params, overheadField, log, next, apiSpan, methodSpan);
}, },
], (err, storingResult) => { ], (err, storingResult) => {
apiSpan.addEvent('Cloudserver:: stored object metadata and data');
if (err) { if (err) {
monitoring.promMetrics('PUT', bucketName, err.code, monitoring.promMetrics('PUT', bucketName, err.code,
'putObject'); 'putObject');
@ -150,7 +166,7 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
&& !Number.isNaN(request.headers['x-amz-meta-size'])) && !Number.isNaN(request.headers['x-amz-meta-size']))
? Number.parseInt(request.headers['x-amz-meta-size'], 10) : null; ? Number.parseInt(request.headers['x-amz-meta-size'], 10) : null;
const newByteLength = parsedContentLength; const newByteLength = parsedContentLength;
apiSpan.addEvent('Cloudserver:: preparing response headers');
setExpirationHeaders(responseHeaders, { setExpirationHeaders(responseHeaders, {
lifecycleConfig: bucket.getLifecycleConfiguration(), lifecycleConfig: bucket.getLifecycleConfiguration(),
objectParams: { objectParams: {
@ -199,12 +215,17 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
location: bucket.getLocationConstraint(), location: bucket.getLocationConstraint(),
numberOfObjects, numberOfObjects,
}); });
apiSpan.end();
monitoring.promMetrics('PUT', bucketName, '200', monitoring.promMetrics('PUT', bucketName, '200',
'putObject', newByteLength, oldByteLength, isVersionedObj, 'putObject', newByteLength, oldByteLength, isVersionedObj,
null, ingestSize); null, ingestSize);
return callback(null, responseHeaders); return process.nextTick(() => {
methodSpan.addEvent('responding to client');
methodSpan.end();
return callback(null, responseHeaders);
});
}); });
}); }, ctx);
} }
module.exports = objectPut; module.exports = objectPut;

View File

@ -51,8 +51,11 @@ function generateXml(xml, owner, userBuckets, splitter) {
* @param {function} callback - callback * @param {function} callback - callback
* @return {undefined} * @return {undefined}
*/ */
function serviceGet(authInfo, request, log, callback) { function serviceGet(authInfo, request, log, callback, parentSpan) {
log.debug('processing request', { method: 'serviceGet' }); log.debug('processing request', { method: 'serviceGet' });
parentSpan.setAttribute('code.function', 'serviceGet()');
parentSpan.setAttribute('code.filepath', 'lib/api/serviceGet.js');
parentSpan.setAttribute('code.lineno', 58);
if (authInfo.isRequesterPublicUser()) { if (authInfo.isRequesterPublicUser()) {
log.debug('operation not available for public user'); log.debug('operation not available for public user');
@ -73,6 +76,7 @@ function serviceGet(authInfo, request, log, callback) {
'</Owner>', '</Owner>',
'<Buckets>' '<Buckets>'
); );
parentSpan.addEvent('getting metadata');
return services.getService(authInfo, request, log, constants.splitter, return services.getService(authInfo, request, log, constants.splitter,
(err, userBuckets, splitter) => { (err, userBuckets, splitter) => {
if (err) { if (err) {

View File

@ -1,5 +1,6 @@
const async = require('async'); const async = require('async');
const { errors } = require('arsenal'); const { errors } = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const metadata = require('./wrapper'); const metadata = require('./wrapper');
const BucketInfo = require('arsenal').models.BucketInfo; const BucketInfo = require('arsenal').models.BucketInfo;
@ -181,9 +182,15 @@ function validateBucket(bucket, params, log, actionImplicitDenies = {}) {
* @param {boolean} actionImplicitDenies - identity authorization results * @param {boolean} actionImplicitDenies - identity authorization results
* @param {RequestLogger} log - request logger * @param {RequestLogger} log - request logger
* @param {function} callback - callback * @param {function} callback - callback
* @param {object} tracerContext - tracing context
* @return {undefined} - and call callback with params err, bucket md * @return {undefined} - and call callback with params err, bucket md
*/ */
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback) { function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback, tracerContext) {
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
const bucketPolicyAuthSpan = tracer.startSpan('Retrieving Metadata and validating user against bucket policy', undefined, tracerContext);
bucketPolicyAuthSpan.setAttribute('code.function', 'standardMetadataValidateBucketAndObj()');
bucketPolicyAuthSpan.setAttribute('code.filepath', 'lib/metadata/metadataUtils.js');
bucketPolicyAuthSpan.setAttribute('code.lineno', 193);
const { authInfo, bucketName, objectKey, versionId, getDeleteMarker, request } = params; const { authInfo, bucketName, objectKey, versionId, getDeleteMarker, request } = params;
let requestType = params.requestType; let requestType = params.requestType;
if (!Array.isArray(requestType)) { if (!Array.isArray(requestType)) {
@ -196,7 +203,15 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
if (getDeleteMarker) { if (getDeleteMarker) {
getOptions.getDeleteMarker = true; getOptions.getDeleteMarker = true;
} }
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieving metadata of bucket and object');
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
bucketPolicyAuthSpan,
);
const metadataSpan = tracer.startSpan('Retrieving metadata', undefined, ctx);
return metadata.getBucketAndObjectMD(bucketName, objectKey, getOptions, log, (err, getResult) => { return metadata.getBucketAndObjectMD(bucketName, objectKey, getOptions, log, (err, getResult) => {
metadataSpan.end();
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieved metadata of bucket');
if (err) { if (err) {
// if some implicit iamAuthzResults, return AccessDenied // if some implicit iamAuthzResults, return AccessDenied
// before leaking any state information // before leaking any state information
@ -209,6 +224,7 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
}); });
}, },
(getResult, next) => { (getResult, next) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating metadata of bucket and object');
const bucket = getResult.bucket ? const bucket = getResult.bucket ?
BucketInfo.deSerialize(getResult.bucket) : undefined; BucketInfo.deSerialize(getResult.bucket) : undefined;
if (!bucket) { if (!bucket) {
@ -227,9 +243,11 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
return getNullVersionFromMaster(bucketName, objectKey, log, return getNullVersionFromMaster(bucketName, objectKey, log,
(err, nullVer) => next(err, bucket, nullVer)); (err, nullVer) => next(err, bucket, nullVer));
} }
bucketPolicyAuthSpan.addEvent('Cloudserver:: bucket and object metadata validated');
return next(null, bucket, objMD); return next(null, bucket, objMD);
}, },
(bucket, objMD, next) => { (bucket, objMD, next) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating user against bucket policy');
const canonicalID = authInfo.getCanonicalID(); const canonicalID = authInfo.getCanonicalID();
if (!isObjAuthorized(bucket, objMD, requestType, canonicalID, authInfo, log, request, if (!isObjAuthorized(bucket, objMD, requestType, canonicalID, authInfo, log, request,
actionImplicitDenies)) { actionImplicitDenies)) {
@ -239,13 +257,18 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
return next(null, bucket, objMD); return next(null, bucket, objMD);
}, },
], (err, bucket, objMD) => { ], (err, bucket, objMD) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: user validation done against bucket policy');
if (err) { if (err) {
// still return bucket for cors headers // still return bucket for cors headers
return callback(err, bucket); return callback(err, bucket);
} }
return callback(null, bucket, objMD); return process.nextTick(() => {
bucketPolicyAuthSpan.end();
return callback(null, bucket, objMD);
});
}); });
} }
/** standardMetadataValidateBucket - retrieve bucket from metadata and check if user /** standardMetadataValidateBucket - retrieve bucket from metadata and check if user
* is authorized to access it * is authorized to access it
* @param {object} params - function parameters * @param {object} params - function parameters
@ -256,11 +279,20 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
* @param {boolean} actionImplicitDenies - identity authorization results * @param {boolean} actionImplicitDenies - identity authorization results
* @param {RequestLogger} log - request logger * @param {RequestLogger} log - request logger
* @param {function} callback - callback * @param {function} callback - callback
* @param {object} tracerContext - tracing context
* @return {undefined} - and call callback with params err, bucket md * @return {undefined} - and call callback with params err, bucket md
*/ */
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback) { function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback, tracerContext) {
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
const bucketPolicyAuthSpan = tracer.startSpan('Validating user against bucket policy', undefined, tracerContext);
bucketPolicyAuthSpan.setAttribute('code.function', 'standardMetadataValidateBucket()');
bucketPolicyAuthSpan.setAttribute('code.filepath', 'lib/metadata/metadataUtils.js');
bucketPolicyAuthSpan.setAttribute('code.lineno', 267);
const { bucketName } = params; const { bucketName } = params;
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieving metadata of bucket');
return metadata.getBucket(bucketName, log, (err, bucket) => { return metadata.getBucket(bucketName, log, (err, bucket) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieved metadata of bucket');
if (err) { if (err) {
// if some implicit actionImplicitDenies, return AccessDenied before // if some implicit actionImplicitDenies, return AccessDenied before
// leaking any state information // leaking any state information
@ -270,8 +302,13 @@ function standardMetadataValidateBucket(params, actionImplicitDenies, log, callb
log.debug('metadata getbucket failed', { error: err }); log.debug('metadata getbucket failed', { error: err });
return callback(err); return callback(err);
} }
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating user against bucket policy2');
const validationError = validateBucket(bucket, params, log, actionImplicitDenies); const validationError = validateBucket(bucket, params, log, actionImplicitDenies);
return callback(validationError, bucket); bucketPolicyAuthSpan.addEvent('Cloudserver:: user validation done against bucket policy');
return process.nextTick(() => {
bucketPolicyAuthSpan.end();
return callback(validationError, bucket);
});
}); });
} }

View File

@ -3,6 +3,7 @@ const https = require('https');
const cluster = require('cluster'); const cluster = require('cluster');
const { series } = require('async'); const { series } = require('async');
const arsenal = require('arsenal'); const arsenal = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const { RedisClient, StatsClient } = arsenal.metrics; const { RedisClient, StatsClient } = arsenal.metrics;
const monitoringClient = require('./utilities/monitoringHandler'); const monitoringClient = require('./utilities/monitoringHandler');
const metrics = require('./utilities/metrics'); const metrics = require('./utilities/metrics');
@ -89,6 +90,16 @@ class S3Server {
* @returns {void} * @returns {void}
*/ */
routeRequest(req, res) { routeRequest(req, res) {
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
let activeSpan = opentelemetry.trace.getActiveSpan(opentelemetry.context.active());
if (!activeSpan) {
activeSpan = tracer.startSpan('routeRequest');
}
activeSpan.addEvent('Cloudserver::routeRequest() Starting routing of request');
activeSpan.setAttribute('code.function', 'routeRequest()');
activeSpan.setAttribute('code.filepath', 'lib/server.js');
activeSpan.setAttribute('code.lineno', 98);
metrics.httpActiveRequests.inc(); metrics.httpActiveRequests.inc();
const requestStartTime = process.hrtime.bigint(); const requestStartTime = process.hrtime.bigint();
@ -140,7 +151,18 @@ class S3Server {
vault, vault,
}, },
}; };
routes(req, res, params, logger, _config); // activeSpan.addEvent('I am going to route the request now');
// activeSpan.setAttribute('s3.bucket_name', req.bucketName);
// activeSpan.setAttribute('s3.object_key', req.objectKey);
// activeSpan.updateName(`${req.method} ${req.bucketName}/${req.objectKey}`);
// const v4Span = tracer.startSpan('verifySignatureV4', {
// parent: activeSpan,
// });
routes(req, res, params, logger, _config, activeSpan, tracer);
// return tracer.startActiveSpan('routeRequest', span => {
// routes(req, res, params, logger, _config);
// span.end();
// });
} }
/** /**

View File

@ -1,6 +1,7 @@
const assert = require('assert'); const assert = require('assert');
const async = require('async'); const async = require('async');
const opentelemetry = require('@opentelemetry/api');
const { errors, s3middleware } = require('arsenal'); const { errors, s3middleware } = require('arsenal');
const ObjectMD = require('arsenal').models.ObjectMD; const ObjectMD = require('arsenal').models.ObjectMD;
@ -308,12 +309,49 @@ const services = {
* @param {function} cb - callback from async.waterfall in objectGet * @param {function} cb - callback from async.waterfall in objectGet
* @return {undefined} * @return {undefined}
*/ */
deleteObject(bucketName, objectMD, objectKey, options, deferLocationDeletion, log, cb) { deleteObject(bucketName, objectMD, objectKey, options, deferLocationDeletion, log, cb, tracerContext) {
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
log.trace('deleting object from bucket'); log.trace('deleting object from bucket');
assert.strictEqual(typeof bucketName, 'string'); assert.strictEqual(typeof bucketName, 'string');
assert.strictEqual(typeof objectMD, 'object'); assert.strictEqual(typeof objectMD, 'object');
function deleteMDandData() { function deleteMDandData() {
if (tracerContext) {
return tracer.startActiveSpan('Delete object metadata', undefined, tracerContext, deleteMdSpan => {
return metadata.deleteObjectMD(bucketName, objectKey, options, log,
(err, res) => {
deleteMdSpan.addEvent('Delete object metadata');
deleteMdSpan.end();
if (err) {
return cb(err, res);
}
log.trace('deleteObject: metadata delete OK');
if (objectMD.location === null) {
return cb(null, res);
}
if (deferLocationDeletion) {
return cb(null, Array.isArray(objectMD.location)
? objectMD.location : [objectMD.location]);
}
if (!Array.isArray(objectMD.location)) {
data.delete(objectMD.location, log);
return cb(null, res);
}
return tracer.startActiveSpan('Batch Deleting Data', undefined, tracerContext, deleteDataSpan => {
return data.batchDelete(objectMD.location, null, null, log, err => {
deleteDataSpan.end();
if (err) {
return cb(err);
}
return cb(null, res);
});
});
});
});
}
return metadata.deleteObjectMD(bucketName, objectKey, options, log, return metadata.deleteObjectMD(bucketName, objectKey, options, log,
(err, res) => { (err, res) => {
if (err) { if (err) {
@ -333,7 +371,6 @@ const services = {
data.delete(objectMD.location, log); data.delete(objectMD.location, log);
return cb(null, res); return cb(null, res);
} }
return data.batchDelete(objectMD.location, null, null, log, err => { return data.batchDelete(objectMD.location, null, null, log, err => {
if (err) { if (err) {
return cb(err); return cb(err);
@ -346,6 +383,19 @@ const services = {
const objGetInfo = objectMD.location; const objGetInfo = objectMD.location;
// special case that prevents azure blocks from unecessary deletion // special case that prevents azure blocks from unecessary deletion
// will return null if no need // will return null if no need
if (tracerContext) {
return tracer.startActiveSpan('Checking if object is stored in Azure', undefined, tracerContext, objectDeleteSpan => {
return data.protectAzureBlocks(bucketName, objectKey, objGetInfo,
log, err => {
objectDeleteSpan.addEvent('Checking if object is stored in Azure');
objectDeleteSpan.end();
if (err) {
return cb(err);
}
return deleteMDandData();
});
});
}
return data.protectAzureBlocks(bucketName, objectKey, objGetInfo, return data.protectAzureBlocks(bucketName, objectKey, objGetInfo,
log, err => { log, err => {
if (err) { if (err) {
@ -365,12 +415,18 @@ const services = {
* @return {undefined} * @return {undefined}
* JSON response from metastore * JSON response from metastore
*/ */
getObjectListing(bucketName, listingParams, log, cb) { getObjectListing(bucketName, listingParams, log, cb, tracerContext) {
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
const objectListSpan = tracer.startSpan('Listing objects from metadata', undefined, tracerContext);
objectListSpan.setAttribute('code.function', 'getObjectListing()');
objectListSpan.setAttribute('code.filepath', 'lib/services.js');
objectListSpan.setAttribute('code.lineno', 374);
assert.strictEqual(typeof bucketName, 'string'); assert.strictEqual(typeof bucketName, 'string');
log.trace('performing metadata get object listing', log.trace('performing metadata get object listing',
{ listingParams }); { listingParams });
metadata.listObject(bucketName, listingParams, log, metadata.listObject(bucketName, listingParams, log,
(err, listResponse) => { (err, listResponse) => {
objectListSpan.end();
if (err) { if (err) {
log.debug('error from metadata', { error: err }); log.debug('error from metadata', { error: err });
return cb(err); return cb(err);

View File

@ -20,7 +20,18 @@
"homepage": "https://github.com/scality/S3#readme", "homepage": "https://github.com/scality/S3#readme",
"dependencies": { "dependencies": {
"@hapi/joi": "^17.1.0", "@hapi/joi": "^17.1.0",
"arsenal": "git+https://github.com/scality/arsenal#7.70.29", "@opentelemetry/api": "^1.8.0",
"@opentelemetry/auto-instrumentations-node": "^0.46.1",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.52.0",
"@opentelemetry/exporter-trace-otlp-proto": "^0.52.0",
"@opentelemetry/instrumentation-ioredis": "^0.41.0",
"@opentelemetry/resources": "^1.24.1",
"@opentelemetry/sdk-metrics": "^1.24.1",
"@opentelemetry/sdk-node": "^0.51.1",
"@opentelemetry/sdk-trace-node": "^1.24.1",
"@opentelemetry/sdk-trace-web": "^1.25.0",
"@opentelemetry/semantic-conventions": "^1.24.1",
"arsenal": "git+https://github.com/scality/Arsenal#1abae5844aaf6a8a9782a30faffda5205760e3b2",
"async": "~2.5.0", "async": "~2.5.0",
"aws-sdk": "2.905.0", "aws-sdk": "2.905.0",
"azure-storage": "^2.1.0", "azure-storage": "^2.1.0",
@ -84,7 +95,7 @@
"start": "npm-run-all --parallel start_dmd start_s3server", "start": "npm-run-all --parallel start_dmd start_s3server",
"start_mdserver": "node mdserver.js", "start_mdserver": "node mdserver.js",
"start_dataserver": "node dataserver.js", "start_dataserver": "node dataserver.js",
"start_s3server": "node index.js", "start_s3server": "node --require ./instrumentation.js index.js",
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver", "start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
"start_utapi": "node lib/utapi/utapi.js", "start_utapi": "node lib/utapi/utapi.js",
"utapi_replay": "node lib/utapi/utapiReplay.js", "utapi_replay": "node lib/utapi/utapiReplay.js",

1863
yarn.lock

File diff suppressed because it is too large Load Diff