Compare commits

...

31 Commits

Author SHA1 Message Date
Anurag Mittal 0ff7357f5f
done with Put object 2024-06-13 09:44:08 +02:00
Anurag Mittal 6df85e9860
need to link API span with method span 2024-06-12 18:56:43 +02:00
Anurag Mittal f3aa959d5d
put object update 2024-06-12 18:17:34 +02:00
Anurag Mittal c17657f7e9
added bucket and request ID to trace tags 2024-06-12 14:47:46 +02:00
Anurag Mittal 2607edcf51
head object and head bucket working 2024-06-12 13:52:08 +02:00
Anurag Mittal c9e6dad068
bucket and object listing ready POC 2024-06-11 18:13:44 +02:00
Anurag Mittal 70b237d674
list bucket complete 2024-06-11 17:51:10 +02:00
Anurag Mittal 285eb6aa5d
updated bucketGet 2024-06-11 17:20:56 +02:00
Anurag Mittal be551f18a4
cleanup of traces, adding events at proper places for cloudserver and vault 2024-06-11 14:44:41 +02:00
Anurag Mittal 1119177c33
removed deleted datadog key 2024-06-11 13:27:19 +02:00
Anurag Mittal e35dc824d7
added honeycomb with latest OTel 2024-06-11 12:14:20 +02:00
Anurag Mittal 2b9339a24b
working Jaeger with latest collector over otlphttp 2024-06-11 11:56:14 +02:00
Anurag Mittal e5b445382f
moved to tempo with otlphttp 2024-06-11 11:50:58 +02:00
Anurag Mittal 198b59ad19
7 traces working with MD and data 2024-06-07 16:06:53 +02:00
Anurag Mittal e093dd2e39
working with 5 events linked 2024-06-07 15:47:47 +02:00
Anurag Mittal d6be7e5dd4
4 + 1 spans 2024-06-07 15:41:24 +02:00
Anurag Mittal 16b54f5a8e
pass around tracer in arsenal 2024-06-07 15:18:04 +02:00
Anurag Mittal 90d00ab3a9
4 span checkpoint for list bucket 2024-06-07 15:17:39 +02:00
Anurag Mittal 08b6b4ad73
finished passing tracer and span to arsenal 2024-06-07 09:31:19 +02:00
Anurag Mittal 5854544c17
Added tracer to cloud server 2024-06-07 04:33:37 +02:00
Anurag Mittal ab3464fc95
successful connection to honeycomb 2024-06-06 18:04:23 +02:00
Anurag Mittal 8ff6256c0c
configured exporter to send traces 2024-06-06 14:58:21 +02:00
Anurag Mittal aa6e1a4ffd
added exporter dependencies 2024-06-06 14:47:41 +02:00
Anurag Mittal 6d70e12230
Otel Collector stack setup complete 2024-06-06 14:33:13 +02:00
Anurag Mittal 99a65122d4
S3C-8896: deploy jaeger and zipkin 2024-06-05 13:41:13 +02:00
Anurag Mittal 16047a430c
S3C-8896: deploy collector with prometheus for metrics 2024-06-05 11:10:23 +02:00
Anurag Mittal ce81935205
S3C-8896 run prometheus and grafana 2024-06-05 10:45:52 +02:00
Anurag Mittal b174d7bfdc
S3C-8896 change service name and version 2024-06-05 10:45:23 +02:00
Anurag Mittal 9eec2b7922
S3C-8896 add OTel resource and semantic convention packages 2024-06-05 10:44:45 +02:00
Anurag Mittal 8ff8794287
S3C-8896: Added based Otel instrumentation 2024-06-05 02:01:13 +02:00
Anurag Mittal fecd2d2371
S3C-8896-added-OTel-packages 2024-06-05 02:00:33 +02:00
17 changed files with 2344 additions and 325 deletions

81
docker-compose.yml Normal file
View File

@ -0,0 +1,81 @@
# endpoints
# prometheus: http://localhost:9090
# grafana: http://localhost:3000
# jaeger: http://localhost:16686
# zipkin: http://localhost:9411
# otel-collector-metrics: http://localhost:8889
# tempo: http://localhost:3200
# version: '3'
services:
otel-collector:
image: otel/opentelemetry-collector-contrib:0.102.1
restart: always
command:
- --config=/etc/otelcol-contrib/otel-collector.yml
volumes:
- ./docker/collector/otel-collector.yml:/etc/otelcol-contrib/otel-collector.yml
ports:
- "1888:1888" # pprof extension
- "8888:8888" # Prometheus metrics exposed by the collector
- "8889:8889" # Prometheus exporter metrics
- "13133:13133" # health_check extension
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP http receiver
- "55679:55679" # zpages extension
depends_on:
- jaeger-all-in-one
- zipkin-all-in-one
prometheus:
container_name: prometheus
image: prom/prometheus
restart: always
command:
- --config.file=/etc/prometheus/prometheus.yml
volumes:
- ./docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
loki:
image: grafana/loki:latest
command: -config.file=/etc/loki/local-config.yaml
ports:
- "3100:3100"
tempo:
image: grafana/tempo:latest
command: [ "-config.file=/etc/tempo.yml" ]
volumes:
- ./docker/tempo/tempo.yml:/etc/tempo.yml
ports:
- "3200:3200" # tempo
- "4317" # otlp grpc
- "4318" # otlp http
grafana:
container_name: grafana
image: grafana/grafana
volumes:
- ./docker/grafana/grafana-datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
ports:
- "3000:3000"
# Jaeger
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
environment:
- COLLECTOR_OTLP_ENABLED=true
restart: always
ports:
- "16686:16686"
- "4318"
- "4317"
# Zipkin
zipkin-all-in-one:
image: openzipkin/zipkin:latest
restart: always
ports:
- "9411:9411"

View File

@ -0,0 +1,72 @@
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
grpc:
endpoint: 0.0.0.0:4317
processors:
# batch metrics before sending to reduce API usage
batch:
exporters:
logging:
loglevel: debug
loki:
endpoint: "http://loki:3100/loki/api/v1/push"
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
otlphttp/tempo:
endpoint: "http://tempo:4318"
tls:
insecure: true
otlp:
endpoint: "api.eu1.honeycomb.io:443"
headers:
"x-honeycomb-team": ""
otlphttp/jaeger:
endpoint: "http://jaeger-all-in-one:4318"
tls:
insecure: true
# datadog:
# api:
# key: ""
zipkin:
endpoint: "http://zipkin-all-in-one:9411/api/v2/spans"
format: proto
# https://github.com/open-telemetry/opentelemetry-collector/blob/main/extension/README.md
extensions:
# responsible for responding to health check calls on behalf of the collector.
health_check:
# fetches the collectors performance data
pprof:
# serves as an http endpoint that provides live debugging data about instrumented components.
zpages:
service:
extensions: [health_check, pprof, zpages]
pipelines:
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
traces:
receivers: [otlp]
processors: [batch]
exporters: [otlphttp/tempo,otlphttp/jaeger,otlp,zipkin]
# exporters: [otlphttp/jaeger,zipkin]
logs:
receivers: [otlp]
exporters: [loki]

View File

@ -0,0 +1,47 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
uid: prometheus
access: proxy
orgId: 1
url: http://prometheus:9090
basicAuth: false
isDefault: false
version: 1
editable: false
jsonData:
httpMethod: GET
- name: Loki
type: loki
uid: loki
access: proxy
orgId: 1
url: http://loki:3100
basicAuth: false
isDefault: false
version: 1
editable: false
- name: Tempo
type: tempo
access: proxy
orgId: 1
url: http://tempo:3200
basicAuth: false
isDefault: true
version: 1
editable: false
apiVersion: 1
uid: tempo
jsonData:
httpMethod: GET
serviceMap:
datasourceUid: prometheus
tracesToLogsV2:
datasourceUid: loki
spanStartTimeShift: '-1h'
spanEndTimeShift: '1h'
filterByTraceID: true
filterBySpanID: true
tags: [ { key: 'service.name', value: 'job' } ]

View File

@ -0,0 +1,15 @@
global:
scrape_interval: 10s
evaluation_interval: 10s
scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8889']
# For direct metrics collection from service to Prometheus
# - job_name: 'cloudserver-service-app'
# metrics_path: /metrics
# static_configs:
# - targets: ['host.docker.internal:8002']

46
docker/tempo/tempo.yml Normal file
View File

@ -0,0 +1,46 @@
server:
http_listen_port: 3200
distributor:
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
thrift_http: #
grpc: # for a production deployment you should only enable the receivers you need!
thrift_binary:
thrift_compact:
zipkin:
otlp:
protocols:
http:
grpc:
opencensus:
ingester:
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
compactor:
compaction:
block_retention: 1h # overall Tempo trace retention. set for demo purposes
metrics_generator:
registry:
external_labels:
source: tempo
cluster: docker-compose
storage:
path: /tmp/tempo/generator/wal
remote_write:
- url: http://prometheus:9090/api/v1/write
send_exemplars: true
storage:
trace:
backend: local # backend configuration to use
wal:
path: /tmp/tempo/wal # where to store the the wal locally
local:
path: /tmp/tempo/blocks
overrides:
metrics_generator_processors: [service-graphs, span-metrics] # enables metrics generator

58
instrumentation.js Normal file
View File

@ -0,0 +1,58 @@
const opentelemetry = require('@opentelemetry/sdk-node');
const { WebTracerProvider } = require('@opentelemetry/sdk-trace-web');
const { Resource } = require('@opentelemetry/resources');
const { PeriodicExportingMetricReader } = require('@opentelemetry/sdk-metrics');
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-proto');
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-proto');
const { BatchSpanProcessor } = require('@opentelemetry/sdk-trace-base');
const {
SEMRESATTRS_SERVICE_NAME,
SEMRESATTRS_SERVICE_VERSION,
} = require('@opentelemetry/semantic-conventions');
// Define resource with service name and version
const resource = new Resource({
[SEMRESATTRS_SERVICE_NAME]: 'cloudserver',
[SEMRESATTRS_SERVICE_VERSION]: '7.70.47',
});
// OTLP Trace Exporter configuration
const traceExporter = new OTLPTraceExporter({
url: 'http://localhost:4318/v1/traces',
headers: {},
});
// Metric Reader configuration
const metricReader = new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: 'http://localhost:4318/v1/metrics',
headers: {},
concurrencyLimit: 1,
}),
});
// Node SDK configuration
const sdk = new opentelemetry.NodeSDK({
traceExporter,
resource,
metricReader,
instrumentations: [
getNodeAutoInstrumentations({
'@opentelemetry/instrumentation-fs': {
enabled: false,
},
}),
],
});
// Additional WebTracerProvider configuration
// This will initialize TracerProvider that will let us create a Tracers
const webTracerProvider = new WebTracerProvider({ resource });
const webSpanProcessor = new BatchSpanProcessor(traceExporter);
webTracerProvider.addSpanProcessor(webSpanProcessor);
webTracerProvider.register();
// Start the Node SDK
sdk.start();

View File

@ -1,5 +1,6 @@
const { auth, errors, policies } = require('arsenal');
const async = require('async');
const opentelemetry = require('@opentelemetry/api');
const bucketDelete = require('./bucketDelete');
const bucketDeleteCors = require('./bucketDeleteCors');
@ -75,215 +76,251 @@ auth.setHandler(vault);
/* eslint-disable no-param-reassign */
const api = {
callApiMethod(apiMethod, request, response, log, callback) {
// Attach the apiMethod method to the request, so it can used by monitoring in the server
// eslint-disable-next-line no-param-reassign
request.apiMethod = apiMethod;
callApiMethod(apiMethod, request, response, log, callback, tracer) {
return tracer.startActiveSpan('Processing API related operations', methodSpan => {
methodSpan.addEvent('Cloudserver::callApiMethod() Processing API related operations');
methodSpan.setAttribute('code.function', 'callApiMethod()');
methodSpan.setAttribute('code.filepath', 'lib/api/api.js');
methodSpan.setAttribute('code.lineno', 84);
// Attach the apiMethod method to the request, so it can used by monitoring in the server
// eslint-disable-next-line no-param-reassign
request.apiMethod = apiMethod;
const actionLog = monitoringMap[apiMethod];
if (!actionLog &&
apiMethod !== 'websiteGet' &&
apiMethod !== 'websiteHead' &&
apiMethod !== 'corsPreflight') {
log.error('callApiMethod(): No actionLog for this api method', {
apiMethod,
});
}
log.addDefaultFields({
service: 's3',
action: actionLog,
bucketName: request.bucketName,
});
if (request.objectKey) {
const actionLog = monitoringMap[apiMethod];
if (!actionLog &&
apiMethod !== 'websiteGet' &&
apiMethod !== 'websiteHead' &&
apiMethod !== 'corsPreflight') {
log.error('callApiMethod(): No actionLog for this api method', {
apiMethod,
});
}
log.addDefaultFields({
objectKey: request.objectKey,
service: 's3',
action: actionLog,
bucketName: request.bucketName,
});
}
let returnTagCount = true;
const validationRes = validateQueryAndHeaders(request, log);
if (validationRes.error) {
log.debug('request query / header validation failed', {
error: validationRes.error,
method: 'api.callApiMethod',
});
return process.nextTick(callback, validationRes.error);
}
// no need to check auth on website or cors preflight requests
if (apiMethod === 'websiteGet' || apiMethod === 'websiteHead' ||
apiMethod === 'corsPreflight') {
request.actionImplicitDenies = false;
return this[apiMethod](request, log, callback);
}
const { sourceBucket, sourceObject, sourceVersionId, parsingError } =
parseCopySource(apiMethod, request.headers['x-amz-copy-source']);
if (parsingError) {
log.debug('error parsing copy source', {
error: parsingError,
});
return process.nextTick(callback, parsingError);
}
const { httpHeadersSizeError } = checkHttpHeadersSize(request.headers);
if (httpHeadersSizeError) {
log.debug('http header size limit exceeded', {
error: httpHeadersSizeError,
});
return process.nextTick(callback, httpHeadersSizeError);
}
const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);
// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
// eslint-disable-next-line no-param-reassign
request.apiMethods = apiMethods;
function checkAuthResults(authResults) {
if (request.objectKey) {
log.addDefaultFields({
objectKey: request.objectKey,
});
}
let returnTagCount = true;
const isImplicitDeny = {};
let isOnlyImplicitDeny = true;
if (apiMethod === 'objectGet') {
// first item checks s3:GetObject(Version) action
if (!authResults[0].isAllowed && !authResults[0].isImplicit) {
log.trace('get object authorization denial from Vault');
return errors.AccessDenied;
}
// TODO add support for returnTagCount in the bucket policy
// checks
isImplicitDeny[authResults[0].action] = authResults[0].isImplicit;
// second item checks s3:GetObject(Version)Tagging action
if (!authResults[1].isAllowed) {
log.trace('get tagging authorization denial ' +
'from Vault');
returnTagCount = false;
}
} else {
for (let i = 0; i < authResults.length; i++) {
isImplicitDeny[authResults[i].action] = true;
if (!authResults[i].isAllowed && !authResults[i].isImplicit) {
// Any explicit deny rejects the current API call
log.trace('authorization denial from Vault');
const validationRes = validateQueryAndHeaders(request, log);
if (validationRes.error) {
log.debug('request query / header validation failed', {
error: validationRes.error,
method: 'api.callApiMethod',
});
return process.nextTick(callback, validationRes.error);
}
// no need to check auth on website or cors preflight requests
if (apiMethod === 'websiteGet' || apiMethod === 'websiteHead' ||
apiMethod === 'corsPreflight') {
request.actionImplicitDenies = false;
return this[apiMethod](request, log, callback);
}
const { sourceBucket, sourceObject, sourceVersionId, parsingError } =
parseCopySource(apiMethod, request.headers['x-amz-copy-source']);
if (parsingError) {
log.debug('error parsing copy source', {
error: parsingError,
});
return process.nextTick(callback, parsingError);
}
const { httpHeadersSizeError } = checkHttpHeadersSize(request.headers);
if (httpHeadersSizeError) {
log.debug('http header size limit exceeded', {
error: httpHeadersSizeError,
});
return process.nextTick(callback, httpHeadersSizeError);
}
const requestContexts = prepareRequestContexts(apiMethod, request,
sourceBucket, sourceObject, sourceVersionId);
// Extract all the _apiMethods and store them in an array
const apiMethods = requestContexts ? requestContexts.map(context => context._apiMethod) : [];
// Attach the names to the current request
// eslint-disable-next-line no-param-reassign
request.apiMethods = apiMethods;
function checkAuthResults(authResults) {
let returnTagCount = true;
const isImplicitDeny = {};
let isOnlyImplicitDeny = true;
if (apiMethod === 'objectGet') {
// first item checks s3:GetObject(Version) action
if (!authResults[0].isAllowed && !authResults[0].isImplicit) {
log.trace('get object authorization denial from Vault');
return errors.AccessDenied;
}
if (authResults[i].isAllowed) {
// If the action is allowed, the result is not implicit
// Deny.
isImplicitDeny[authResults[i].action] = false;
isOnlyImplicitDeny = false;
// TODO add support for returnTagCount in the bucket policy
// checks
isImplicitDeny[authResults[0].action] = authResults[0].isImplicit;
// second item checks s3:GetObject(Version)Tagging action
if (!authResults[1].isAllowed) {
log.trace('get tagging authorization denial ' +
'from Vault');
returnTagCount = false;
}
} else {
for (let i = 0; i < authResults.length; i++) {
isImplicitDeny[authResults[i].action] = true;
if (!authResults[i].isAllowed && !authResults[i].isImplicit) {
// Any explicit deny rejects the current API call
log.trace('authorization denial from Vault');
return errors.AccessDenied;
}
if (authResults[i].isAllowed) {
// If the action is allowed, the result is not implicit
// Deny.
isImplicitDeny[authResults[i].action] = false;
isOnlyImplicitDeny = false;
}
}
}
// These two APIs cannot use ACLs or Bucket Policies, hence, any
// implicit deny from vault must be treated as an explicit deny.
if ((apiMethod === 'bucketPut' || apiMethod === 'serviceGet') && isOnlyImplicitDeny) {
return errors.AccessDenied;
}
return { returnTagCount, isImplicitDeny };
}
// These two APIs cannot use ACLs or Bucket Policies, hence, any
// implicit deny from vault must be treated as an explicit deny.
if ((apiMethod === 'bucketPut' || apiMethod === 'serviceGet') && isOnlyImplicitDeny) {
return errors.AccessDenied;
}
return { returnTagCount, isImplicitDeny };
}
return async.waterfall([
next => auth.server.doAuth(
request, log, (err, userInfo, authorizationResults, streamingV4Params) => {
if (err) {
log.trace('authentication error', { error: err });
return next(err);
return async.waterfall([
next => tracer.startActiveSpan('Authentication of user against IAM', authUserSpan => {
authUserSpan.setAttribute('code.function', 'auth.server.doAuth()');
authUserSpan.setAttribute('code.filepath', 'lib/api/api.js');
authUserSpan.setAttribute('code.lineno', 197);
return next(null, authUserSpan);
}),
(authSpan, next) => auth.server.doAuth(
request, log, (err, userInfo, authorizationResults, streamingV4Params) => {
if (err) {
log.trace('authentication error', { error: err });
authSpan.end();
return next(err);
}
const authNames = { accountName: userInfo.getAccountDisplayName() };
authSpan.setAttribute('user.accountName', authNames.accountName);
if (authNames.userName) {
authSpan.setAttribute('user.userName', authNames.userName);
}
authSpan.end();
return next(null, userInfo, authorizationResults, streamingV4Params);
}, 's3', requestContexts),
(userInfo, authorizationResults, streamingV4Params, next) => {
const authNames = { accountName: userInfo.getAccountDisplayName() };
if (userInfo.isRequesterAnIAMUser()) {
authNames.userName = userInfo.getIAMdisplayName();
}
return next(null, userInfo, authorizationResults, streamingV4Params);
}, 's3', requestContexts),
(userInfo, authorizationResults, streamingV4Params, next) => {
const authNames = { accountName: userInfo.getAccountDisplayName() };
if (userInfo.isRequesterAnIAMUser()) {
authNames.userName = userInfo.getIAMdisplayName();
}
log.addDefaultFields(authNames);
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
return next(null, userInfo, authorizationResults, streamingV4Params);
}
// issue 100 Continue to the client
writeContinue(request, response);
const MAX_POST_LENGTH = request.method === 'POST' ?
1024 * 1024 : 1024 * 1024 / 2; // 1 MB or 512 KB
const post = [];
let postLength = 0;
request.on('data', chunk => {
postLength += chunk.length;
// Sanity check on post length
if (postLength <= MAX_POST_LENGTH) {
post.push(chunk);
log.addDefaultFields(authNames);
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
return next(null, userInfo, authorizationResults, streamingV4Params);
}
});
request.on('error', err => {
log.trace('error receiving request', {
error: err,
// issue 100 Continue to the client
writeContinue(request, response);
const MAX_POST_LENGTH = request.method === 'POST' ?
1024 * 1024 : 1024 * 1024 / 2; // 1 MB or 512 KB
const post = [];
let postLength = 0;
request.on('data', chunk => {
postLength += chunk.length;
// Sanity check on post length
if (postLength <= MAX_POST_LENGTH) {
post.push(chunk);
}
});
request.on('error', err => {
log.trace('error receiving request', {
error: err,
});
return next(errors.InternalError);
});
return next(errors.InternalError);
});
request.on('end', () => {
if (postLength > MAX_POST_LENGTH) {
log.error('body length is too long for request type',
{ postLength });
return next(errors.InvalidRequest);
}
// Convert array of post buffers into one string
request.post = Buffer.concat(post, postLength).toString();
return next(null, userInfo, authorizationResults, streamingV4Params);
});
return undefined;
},
// Tag condition keys require information from CloudServer for evaluation
(userInfo, authorizationResults, streamingV4Params, next) => tagConditionKeyAuth(
authorizationResults,
request,
requestContexts,
apiMethod,
log,
(err, authResultsWithTags) => {
if (err) {
log.trace('tag authentication error', { error: err });
return next(err);
}
return next(null, userInfo, authResultsWithTags, streamingV4Params);
request.on('end', () => {
if (postLength > MAX_POST_LENGTH) {
log.error('body length is too long for request type',
{ postLength });
return next(errors.InvalidRequest);
}
// Convert array of post buffers into one string
request.post = Buffer.concat(post, postLength).toString();
return next(null, userInfo, authorizationResults, streamingV4Params);
});
return undefined;
},
),
], (err, userInfo, authorizationResults, streamingV4Params) => {
if (err) {
return callback(err);
}
if (authorizationResults) {
const checkedResults = checkAuthResults(authorizationResults);
if (checkedResults instanceof Error) {
return callback(checkedResults);
// Tag condition keys require information from CloudServer for evaluation
(userInfo, authorizationResults, streamingV4Params, next) => tagConditionKeyAuth(
authorizationResults,
request,
requestContexts,
apiMethod,
log,
(err, authResultsWithTags) => {
methodSpan.addEvent('Authentication of user completed');
if (err) {
log.trace('tag authentication error', { error: err });
return next(err);
}
return next(null, userInfo, authResultsWithTags, streamingV4Params);
},
),
], (err, userInfo, authorizationResults, streamingV4Params) => {
if (err) {
return callback(err);
}
returnTagCount = checkedResults.returnTagCount;
request.actionImplicitDenies = checkedResults.isImplicitDeny;
} else {
// create an object of keys apiMethods with all values to false:
// for backward compatibility, all apiMethods are allowed by default
// thus it is explicitly allowed, so implicit deny is false
request.actionImplicitDenies = apiMethods.reduce((acc, curr) => {
acc[curr] = false;
return acc;
}, {});
}
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
request._response = response;
return this[apiMethod](userInfo, request, streamingV4Params,
log, callback, authorizationResults);
}
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') {
return this[apiMethod](userInfo, request, sourceBucket,
sourceObject, sourceVersionId, log, callback);
}
if (apiMethod === 'objectGet') {
return this[apiMethod](userInfo, request, returnTagCount, log, callback);
}
return this[apiMethod](userInfo, request, log, callback);
if (authorizationResults) {
const checkedResults = checkAuthResults(authorizationResults);
if (checkedResults instanceof Error) {
return callback(checkedResults);
}
returnTagCount = checkedResults.returnTagCount;
request.actionImplicitDenies = checkedResults.isImplicitDeny;
} else {
// create an object of keys apiMethods with all values to false:
// for backward compatibility, all apiMethods are allowed by default
// thus it is explicitly allowed, so implicit deny is false
request.actionImplicitDenies = apiMethods.reduce((acc, curr) => {
acc[curr] = false;
return acc;
}, {});
}
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
methodSpan,
);
const apiSpan = tracer.startSpan(`API operation for ${apiMethod}`, undefined, ctx);
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
request._response = response;
return this[apiMethod](userInfo, request, streamingV4Params,
log, callback, authorizationResults);
}
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') {
return this[apiMethod](userInfo, request, sourceBucket,
sourceObject, sourceVersionId, log, callback, apiSpan);
}
if (apiMethod === 'objectGet') {
return this[apiMethod](userInfo, request, returnTagCount, log, callback, apiSpan, methodSpan);
}
return this[apiMethod](userInfo, request, log, (err, res, cosrsHeaders) => {
methodSpan.addEvent('Cloudserver::callApiMethod() API operation completed, sending response to client');
apiSpan.end();
return process.nextTick(() => {
methodSpan.end();
if (err) {
return callback(err);
}
return callback(null, res, cosrsHeaders);
});
}, apiSpan);
});
});
},
bucketDelete,

View File

@ -1,5 +1,6 @@
const querystring = require('querystring');
const { errors, versioning, s3middleware } = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const constants = require('../../constants');
const services = require('../services');
@ -261,7 +262,12 @@ function processMasterVersions(bucketName, listParams, list) {
}
function handleResult(listParams, requestMaxKeys, encoding, authInfo,
bucketName, list, corsHeaders, log, callback) {
bucketName, list, corsHeaders, log, callback, traceContext) {
const tracer = opentelemetry.trace.getTracer('cloudserver');
const handleResultSpan = tracer.startSpan('Preparing result', undefined, traceContext);
handleResultSpan.setAttribute('code.function', 'handleResult()');
handleResultSpan.setAttribute('code.filepath', 'lib/api/bucketGet.js');
handleResultSpan.setAttribute('code.lineno', 270);
// eslint-disable-next-line no-param-reassign
listParams.maxKeys = requestMaxKeys;
// eslint-disable-next-line no-param-reassign
@ -275,6 +281,7 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
}
pushMetric('listBucket', log, { authInfo, bucket: bucketName });
monitoring.promMetrics('GET', bucketName, '200', 'listBucket');
handleResultSpan.end();
return callback(null, res, corsHeaders);
}
@ -288,7 +295,16 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
* with either error code or xml response body
* @return {undefined}
*/
function bucketGet(authInfo, request, log, callback) {
function bucketGet(authInfo, request, log, callback, parentSpan) {
parentSpan.addEvent('Cloudserver::bucketGet() processing bucketGet request');
parentSpan.setAttribute('code.function', 'bucketGet()');
parentSpan.setAttribute('code.filepath', 'lib/api/bucketGet.js');
parentSpan.setAttribute('code.lineno', 299);
// const tracer = opentelemetry.trace.getTracer('cloudserver');
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
parentSpan,
);
const params = request.query;
const bucketName = request.bucketName;
const v2 = params['list-type'];
@ -350,7 +366,7 @@ function bucketGet(authInfo, request, log, callback) {
} else {
listParams.marker = params.marker;
}
// parentSpan.addEvent('Cloudserver::bucketGet() validating Metadata');
standardMetadataValidateBucket(metadataValParams, request.actionImplicitDenies, log, (err, bucket) => {
const corsHeaders = collectCorsHeaders(request.headers.origin,
request.method, bucket);
@ -367,7 +383,9 @@ function bucketGet(authInfo, request, log, callback) {
listParams.versionIdMarker = params['version-id-marker'] ?
versionIdUtils.decode(params['version-id-marker']) : undefined;
}
// parentSpan.addEvent('Cloudserver::bucketGet() Getting object listing');
if (!requestMaxKeys) {
parentSpan.addEvent('Cloudserver::bucketGet() Empty bucket');
const emptyList = {
CommonPrefixes: [],
Contents: [],
@ -375,7 +393,7 @@ function bucketGet(authInfo, request, log, callback) {
IsTruncated: false,
};
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
bucketName, emptyList, corsHeaders, log, callback);
bucketName, emptyList, corsHeaders, log, callback, ctx);
}
return services.getObjectListing(bucketName, listParams, log,
(err, list) => {
@ -386,9 +404,9 @@ function bucketGet(authInfo, request, log, callback) {
return callback(err, null, corsHeaders);
}
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
bucketName, list, corsHeaders, log, callback);
});
});
bucketName, list, corsHeaders, log, callback, ctx);
}, ctx);
}, ctx);
return undefined;
}

View File

@ -1,5 +1,6 @@
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
const { standardMetadataValidateBucket } = require('../metadata/metadataUtils');
const opentelemetry = require('@opentelemetry/api');
const { pushMetric } = require('../utapi/utilities');
const monitoring = require('../utilities/metrics');
@ -13,7 +14,15 @@ const monitoring = require('../utilities/metrics');
* with either error code or success
* @return {undefined}
*/
function bucketHead(authInfo, request, log, callback) {
function bucketHead(authInfo, request, log, callback, parentSpan) {
parentSpan.addEvent('Cloudserver::bucketGet() processing Head Bucket request');
parentSpan.setAttribute('code.function', 'bucketHead()');
parentSpan.setAttribute('code.filepath', 'lib/api/bucketHead.js');
parentSpan.setAttribute('code.lineno', 20);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
parentSpan,
);
log.debug('processing request', { method: 'bucketHead' });
const bucketName = request.bucketName;
const metadataValParams = {
@ -38,7 +47,7 @@ function bucketHead(authInfo, request, log, callback) {
'x-amz-bucket-region': bucket.getLocationConstraint(),
};
return callback(null, Object.assign(corsHeaders, headers));
});
}, ctx);
}
module.exports = bucketHead;

View File

@ -1,5 +1,6 @@
const { errors, s3middleware } = require('arsenal');
const { parseRange } = require('arsenal').network.http.utils;
const opentelemetry = require('@opentelemetry/api');
const { data } = require('../data/wrapper');
@ -25,7 +26,15 @@ const validateHeaders = s3middleware.validateConditionalHeaders;
* @param {function} callback - callback to function in route
* @return {undefined}
*/
function objectGet(authInfo, request, returnTagCount, log, callback) {
function objectGet(authInfo, request, returnTagCount, log, callback, apiSpan, callAPIMethodSpan) {
apiSpan.addEvent('Cloudserver::objectGet() processing objectGet request');
apiSpan.setAttribute('code.function', 'objectGet()');
apiSpan.setAttribute('code.filepath', 'lib/api/objectGet.js');
apiSpan.setAttribute('code.lineno', 32);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
apiSpan,
);
log.debug('processing request', { method: 'objectGet' });
const bucketName = request.bucketName;
const objectKey = request.objectKey;
@ -64,6 +73,7 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
return callback(err, null, corsHeaders);
}
if (!objMD) {
apiSpan.addEvent(versionId ? 'Cloudserver::objectGet() Object version not found' : 'Cloudserver::objectGet() Object not found');
const err = versionId ? errors.NoSuchVersion : errors.NoSuchKey;
monitoring.promMetrics(
'GET', bucketName, err.code, 'getObject');
@ -87,6 +97,7 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
return callback(errors.MethodNotAllowed, null,
responseMetaHeaders);
}
apiSpan.addEvent('Cloudserver::bucketGet() Validating API request headers against object metadata');
const headerValResult = validateHeaders(request.headers,
objMD['last-modified'], objMD['content-md5']);
if (headerValResult.error) {
@ -233,7 +244,9 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
dataLocator = setPartRanges(dataLocator, byteRange);
}
}
apiSpan.addEvent('Cloudserver::bucketGet() Trying to get data location using head request');
return data.head(dataLocator, log, err => {
apiSpan.addEvent('Cloudserver::bucketGet() Data location found');
if (err) {
log.error('error from external backend checking for ' +
'object existence', { error: err });
@ -253,9 +266,9 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
monitoring.promMetrics('GET', bucketName, '200', 'getObject',
Number.parseInt(responseMetaHeaders['Content-Length'], 10));
return callback(null, dataLocator, responseMetaHeaders,
byteRange);
byteRange, apiSpan, callAPIMethodSpan);
});
});
}, ctx);
}
module.exports = objectGet;

View File

@ -1,6 +1,7 @@
const { errors, s3middleware } = require('arsenal');
const validateHeaders = s3middleware.validateConditionalHeaders;
const { parseRange } = require('arsenal').network.http.utils;
const opentelemetry = require('@opentelemetry/api');
const { decodeVersionId } = require('./apiUtils/object/versioning');
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
@ -25,7 +26,15 @@ const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
* @return {undefined}
*
*/
function objectHead(authInfo, request, log, callback) {
function objectHead(authInfo, request, log, callback, parentSpan) {
parentSpan.addEvent('Cloudserver::bucketGet() processing Head Object request');
parentSpan.setAttribute('code.function', 'objectHead()');
parentSpan.setAttribute('code.filepath', 'lib/api/objectHead.js');
parentSpan.setAttribute('code.lineno', 32);
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
parentSpan,
);
log.debug('processing request', { method: 'objectHead' });
const bucketName = request.bucketName;
const objectKey = request.objectKey;
@ -160,7 +169,7 @@ function objectHead(authInfo, request, log, callback) {
});
monitoring.promMetrics('HEAD', bucketName, '200', 'headObject');
return callback(null, responseHeaders);
});
}, ctx);
}
module.exports = objectHead;

View File

@ -51,8 +51,11 @@ function generateXml(xml, owner, userBuckets, splitter) {
* @param {function} callback - callback
* @return {undefined}
*/
function serviceGet(authInfo, request, log, callback) {
function serviceGet(authInfo, request, log, callback, parentSpan) {
log.debug('processing request', { method: 'serviceGet' });
parentSpan.setAttribute('code.function', 'serviceGet()');
parentSpan.setAttribute('code.filepath', 'lib/api/serviceGet.js');
parentSpan.setAttribute('code.lineno', 58);
if (authInfo.isRequesterPublicUser()) {
log.debug('operation not available for public user');
@ -73,6 +76,7 @@ function serviceGet(authInfo, request, log, callback) {
'</Owner>',
'<Buckets>'
);
parentSpan.addEvent('getting metadata');
return services.getService(authInfo, request, log, constants.splitter,
(err, userBuckets, splitter) => {
if (err) {

View File

@ -1,5 +1,6 @@
const async = require('async');
const { errors } = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const metadata = require('./wrapper');
const BucketInfo = require('arsenal').models.BucketInfo;
@ -181,9 +182,15 @@ function validateBucket(bucket, params, log, actionImplicitDenies = {}) {
* @param {boolean} actionImplicitDenies - identity authorization results
* @param {RequestLogger} log - request logger
* @param {function} callback - callback
* @param {object} tracerContext - tracing context
* @return {undefined} - and call callback with params err, bucket md
*/
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback) {
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback, tracerContext) {
const tracer = opentelemetry.trace.getTracer('cloudserver');
const bucketPolicyAuthSpan = tracer.startSpan('Retrieving Metadata and validating user against bucket policy', undefined, tracerContext);
bucketPolicyAuthSpan.setAttribute('code.function', 'standardMetadataValidateBucketAndObj()');
bucketPolicyAuthSpan.setAttribute('code.filepath', 'lib/metadata/metadataUtils.js');
bucketPolicyAuthSpan.setAttribute('code.lineno', 193);
const { authInfo, bucketName, objectKey, versionId, getDeleteMarker, request } = params;
let requestType = params.requestType;
if (!Array.isArray(requestType)) {
@ -196,7 +203,15 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
if (getDeleteMarker) {
getOptions.getDeleteMarker = true;
}
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieving metadata of bucket and object');
const ctx = opentelemetry.trace.setSpan(
opentelemetry.context.active(),
bucketPolicyAuthSpan,
);
const metadataSpan = tracer.startSpan('Retrieving metadata', undefined, ctx);
return metadata.getBucketAndObjectMD(bucketName, objectKey, getOptions, log, (err, getResult) => {
metadataSpan.end();
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieved metadata of bucket');
if (err) {
// if some implicit iamAuthzResults, return AccessDenied
// before leaking any state information
@ -209,6 +224,7 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
});
},
(getResult, next) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating metadata of bucket and object');
const bucket = getResult.bucket ?
BucketInfo.deSerialize(getResult.bucket) : undefined;
if (!bucket) {
@ -227,9 +243,11 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
return getNullVersionFromMaster(bucketName, objectKey, log,
(err, nullVer) => next(err, bucket, nullVer));
}
bucketPolicyAuthSpan.addEvent('Cloudserver:: bucket and object metadata validated');
return next(null, bucket, objMD);
},
(bucket, objMD, next) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating user against bucket policy');
const canonicalID = authInfo.getCanonicalID();
if (!isObjAuthorized(bucket, objMD, requestType, canonicalID, authInfo, log, request,
actionImplicitDenies)) {
@ -239,13 +257,18 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
return next(null, bucket, objMD);
},
], (err, bucket, objMD) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: user validation done against bucket policy');
if (err) {
// still return bucket for cors headers
return callback(err, bucket);
}
return callback(null, bucket, objMD);
return process.nextTick(() => {
bucketPolicyAuthSpan.end();
return callback(null, bucket, objMD);
});
});
}
/** standardMetadataValidateBucket - retrieve bucket from metadata and check if user
* is authorized to access it
* @param {object} params - function parameters
@ -256,11 +279,20 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
* @param {boolean} actionImplicitDenies - identity authorization results
* @param {RequestLogger} log - request logger
* @param {function} callback - callback
* @param {object} tracerContext - tracing context
* @return {undefined} - and call callback with params err, bucket md
*/
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback) {
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback, tracerContext) {
const tracer = opentelemetry.trace.getTracer('cloudserver');
const bucketPolicyAuthSpan = tracer.startSpan('Validating user against bucket policy', undefined, tracerContext);
bucketPolicyAuthSpan.setAttribute('code.function', 'standardMetadataValidateBucket()');
bucketPolicyAuthSpan.setAttribute('code.filepath', 'lib/metadata/metadataUtils.js');
bucketPolicyAuthSpan.setAttribute('code.lineno', 267);
const { bucketName } = params;
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieving metadata of bucket');
return metadata.getBucket(bucketName, log, (err, bucket) => {
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieved metadata of bucket');
if (err) {
// if some implicit actionImplicitDenies, return AccessDenied before
// leaking any state information
@ -270,8 +302,13 @@ function standardMetadataValidateBucket(params, actionImplicitDenies, log, callb
log.debug('metadata getbucket failed', { error: err });
return callback(err);
}
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating user against bucket policy2');
const validationError = validateBucket(bucket, params, log, actionImplicitDenies);
return callback(validationError, bucket);
bucketPolicyAuthSpan.addEvent('Cloudserver:: user validation done against bucket policy');
return process.nextTick(() => {
bucketPolicyAuthSpan.end();
return callback(validationError, bucket);
});
});
}

View File

@ -3,6 +3,7 @@ const https = require('https');
const cluster = require('cluster');
const { series } = require('async');
const arsenal = require('arsenal');
const opentelemetry = require('@opentelemetry/api');
const { RedisClient, StatsClient } = arsenal.metrics;
const monitoringClient = require('./utilities/monitoringHandler');
const metrics = require('./utilities/metrics');
@ -89,6 +90,12 @@ class S3Server {
* @returns {void}
*/
routeRequest(req, res) {
const tracer = opentelemetry.trace.getTracer('cloudserver');
const activeSpan = opentelemetry.trace.getActiveSpan(opentelemetry.context.active());
activeSpan.addEvent('Cloudserver::routeRequest() Starting routing of request');
activeSpan.setAttribute('code.function', 'routeRequest()');
activeSpan.setAttribute('code.filepath', 'lib/server.js');
activeSpan.setAttribute('code.lineno', 98);
metrics.httpActiveRequests.inc();
const requestStartTime = process.hrtime.bigint();
@ -140,7 +147,18 @@ class S3Server {
vault,
},
};
routes(req, res, params, logger, _config);
// activeSpan.addEvent('I am going to route the request now');
// activeSpan.setAttribute('s3.bucket_name', req.bucketName);
// activeSpan.setAttribute('s3.object_key', req.objectKey);
// activeSpan.updateName(`${req.method} ${req.bucketName}/${req.objectKey}`);
// const v4Span = tracer.startSpan('verifySignatureV4', {
// parent: activeSpan,
// });
routes(req, res, params, logger, _config, activeSpan, tracer);
// return tracer.startActiveSpan('routeRequest', span => {
// routes(req, res, params, logger, _config);
// span.end();
// });
}
/**

View File

@ -1,6 +1,7 @@
const assert = require('assert');
const async = require('async');
const opentelemetry = require('@opentelemetry/api');
const { errors, s3middleware } = require('arsenal');
const ObjectMD = require('arsenal').models.ObjectMD;
@ -365,12 +366,18 @@ const services = {
* @return {undefined}
* JSON response from metastore
*/
getObjectListing(bucketName, listingParams, log, cb) {
getObjectListing(bucketName, listingParams, log, cb, tracerContext) {
const tracer = opentelemetry.trace.getTracer('cloudserver');
const objectListSpan = tracer.startSpan('Listing objects from metadata', undefined, tracerContext);
objectListSpan.setAttribute('code.function', 'getObjectListing()');
objectListSpan.setAttribute('code.filepath', 'lib/services.js');
objectListSpan.setAttribute('code.lineno', 374);
assert.strictEqual(typeof bucketName, 'string');
log.trace('performing metadata get object listing',
{ listingParams });
metadata.listObject(bucketName, listingParams, log,
(err, listResponse) => {
objectListSpan.end();
if (err) {
log.debug('error from metadata', { error: err });
return cb(err);

View File

@ -20,7 +20,17 @@
"homepage": "https://github.com/scality/S3#readme",
"dependencies": {
"@hapi/joi": "^17.1.0",
"arsenal": "git+https://github.com/scality/arsenal#7.70.29",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/auto-instrumentations-node": "^0.46.1",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.52.0",
"@opentelemetry/exporter-trace-otlp-proto": "^0.52.0",
"@opentelemetry/resources": "^1.24.1",
"@opentelemetry/sdk-metrics": "^1.24.1",
"@opentelemetry/sdk-node": "^0.51.1",
"@opentelemetry/sdk-trace-node": "^1.24.1",
"@opentelemetry/sdk-trace-web": "^1.25.0",
"@opentelemetry/semantic-conventions": "^1.24.1",
"arsenal": "git+https://github.com/scality/Arsenal#7f38aa4346205caee4e21ecb3a5c09e9131f0f20",
"async": "~2.5.0",
"aws-sdk": "2.905.0",
"azure-storage": "^2.1.0",
@ -84,7 +94,7 @@
"start": "npm-run-all --parallel start_dmd start_s3server",
"start_mdserver": "node mdserver.js",
"start_dataserver": "node dataserver.js",
"start_s3server": "node index.js",
"start_s3server": "node --require ./instrumentation.js index.js",
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
"start_utapi": "node lib/utapi/utapi.js",
"utapi_replay": "node lib/utapi/utapiReplay.js",

1756
yarn.lock

File diff suppressed because it is too large Load Diff