Compare commits
40 Commits
developmen
...
feature/S3
Author | SHA1 | Date |
---|---|---|
Anurag Mittal | 70e03f9ecd | |
Anurag Mittal | bcea25d318 | |
Anurag Mittal | 4a408af758 | |
Anurag Mittal | 2854a4355a | |
Anurag Mittal | b1e869f330 | |
Anurag Mittal | 7a9f40a91e | |
Anurag Mittal | 63579e2782 | |
Anurag Mittal | c59c180547 | |
Anurag Mittal | 4c1680fa57 | |
Anurag Mittal | 0ff7357f5f | |
Anurag Mittal | 6df85e9860 | |
Anurag Mittal | f3aa959d5d | |
Anurag Mittal | c17657f7e9 | |
Anurag Mittal | 2607edcf51 | |
Anurag Mittal | c9e6dad068 | |
Anurag Mittal | 70b237d674 | |
Anurag Mittal | 285eb6aa5d | |
Anurag Mittal | be551f18a4 | |
Anurag Mittal | 1119177c33 | |
Anurag Mittal | e35dc824d7 | |
Anurag Mittal | 2b9339a24b | |
Anurag Mittal | e5b445382f | |
Anurag Mittal | 198b59ad19 | |
Anurag Mittal | e093dd2e39 | |
Anurag Mittal | d6be7e5dd4 | |
Anurag Mittal | 16b54f5a8e | |
Anurag Mittal | 90d00ab3a9 | |
Anurag Mittal | 08b6b4ad73 | |
Anurag Mittal | 5854544c17 | |
Anurag Mittal | ab3464fc95 | |
Anurag Mittal | 8ff6256c0c | |
Anurag Mittal | aa6e1a4ffd | |
Anurag Mittal | 6d70e12230 | |
Anurag Mittal | 99a65122d4 | |
Anurag Mittal | 16047a430c | |
Anurag Mittal | ce81935205 | |
Anurag Mittal | b174d7bfdc | |
Anurag Mittal | 9eec2b7922 | |
Anurag Mittal | 8ff8794287 | |
Anurag Mittal | fecd2d2371 |
|
@ -0,0 +1,82 @@
|
|||
# endpoints
|
||||
# prometheus: http://localhost:9090
|
||||
# grafana: http://localhost:3000
|
||||
# jaeger: http://localhost:16686
|
||||
# zipkin: http://localhost:9411
|
||||
# otel-collector-metrics: http://localhost:8889
|
||||
# tempo: http://localhost:3200
|
||||
|
||||
# version: '3'
|
||||
services:
|
||||
otel-collector:
|
||||
image: otel/opentelemetry-collector-contrib:0.102.1
|
||||
restart: always
|
||||
command:
|
||||
- --config=/etc/otelcol-contrib/otel-collector.yml
|
||||
volumes:
|
||||
- ./docker/collector/otel-collector.yml:/etc/otelcol-contrib/otel-collector.yml
|
||||
ports:
|
||||
- "1888:1888" # pprof extension
|
||||
- "8888:8888" # Prometheus metrics exposed by the collector
|
||||
- "8889:8889" # Prometheus exporter metrics
|
||||
- "13133:13133" # health_check extension
|
||||
- "4317:4317" # OTLP gRPC receiver
|
||||
- "4318:4318" # OTLP http receiver
|
||||
- "55679:55679" # zpages extension
|
||||
depends_on:
|
||||
- jaeger-all-in-one
|
||||
- zipkin-all-in-one
|
||||
|
||||
prometheus:
|
||||
container_name: prometheus
|
||||
image: prom/prometheus
|
||||
restart: always
|
||||
command:
|
||||
- --config.file=/etc/prometheus/prometheus.yml
|
||||
volumes:
|
||||
- ./docker/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
|
||||
ports:
|
||||
- "9090:9090"
|
||||
|
||||
loki:
|
||||
image: grafana/loki:latest
|
||||
command: -config.file=/etc/loki/local-config.yaml
|
||||
ports:
|
||||
- "3100:3100"
|
||||
|
||||
tempo:
|
||||
image: grafana/tempo:latest
|
||||
command: [ "-config.file=/etc/tempo.yml" ]
|
||||
volumes:
|
||||
- ./docker/tempo/tempo.yml:/etc/tempo.yml
|
||||
ports:
|
||||
- "3200:3200" # tempo
|
||||
- "4317" # otlp grpc
|
||||
- "4318" # otlp http
|
||||
|
||||
grafana:
|
||||
container_name: grafana
|
||||
image: grafana/grafana
|
||||
volumes:
|
||||
- ./docker/grafana/grafana-datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
|
||||
ports:
|
||||
- "3000:3000"
|
||||
|
||||
# Jaeger
|
||||
jaeger-all-in-one:
|
||||
image: jaegertracing/all-in-one:latest
|
||||
environment:
|
||||
- COLLECTOR_OTLP_ENABLED=true
|
||||
- JAEGER_QUERY_MAX_CLOCK_SKEW_ADJUSTMENT=0s
|
||||
restart: always
|
||||
ports:
|
||||
- "16686:16686"
|
||||
- "4318"
|
||||
- "4317"
|
||||
|
||||
# Zipkin
|
||||
zipkin-all-in-one:
|
||||
image: openzipkin/zipkin:latest
|
||||
restart: always
|
||||
ports:
|
||||
- "9411:9411"
|
|
@ -0,0 +1,72 @@
|
|||
receivers:
|
||||
otlp:
|
||||
protocols:
|
||||
http:
|
||||
endpoint: 0.0.0.0:4318
|
||||
grpc:
|
||||
endpoint: 0.0.0.0:4317
|
||||
|
||||
processors:
|
||||
# batch metrics before sending to reduce API usage
|
||||
batch:
|
||||
|
||||
exporters:
|
||||
logging:
|
||||
loglevel: debug
|
||||
|
||||
loki:
|
||||
endpoint: "http://loki:3100/loki/api/v1/push"
|
||||
|
||||
prometheus:
|
||||
endpoint: "0.0.0.0:8889"
|
||||
const_labels:
|
||||
label1: value1
|
||||
|
||||
otlphttp/tempo:
|
||||
endpoint: "http://tempo:4318"
|
||||
tls:
|
||||
insecure: true
|
||||
|
||||
otlp:
|
||||
endpoint: "api.eu1.honeycomb.io:443"
|
||||
headers:
|
||||
"x-honeycomb-team": ""
|
||||
|
||||
otlphttp/jaeger:
|
||||
endpoint: "http://jaeger-all-in-one:4318"
|
||||
tls:
|
||||
insecure: true
|
||||
|
||||
# datadog:
|
||||
# api:
|
||||
# key: ""
|
||||
|
||||
zipkin:
|
||||
endpoint: "http://zipkin-all-in-one:9411/api/v2/spans"
|
||||
format: proto
|
||||
|
||||
|
||||
# https://github.com/open-telemetry/opentelemetry-collector/blob/main/extension/README.md
|
||||
extensions:
|
||||
# responsible for responding to health check calls on behalf of the collector.
|
||||
health_check:
|
||||
# fetches the collector’s performance data
|
||||
pprof:
|
||||
# serves as an http endpoint that provides live debugging data about instrumented components.
|
||||
zpages:
|
||||
|
||||
service:
|
||||
extensions: [health_check, pprof, zpages]
|
||||
pipelines:
|
||||
metrics:
|
||||
receivers: [otlp]
|
||||
processors: [batch]
|
||||
exporters: [prometheus]
|
||||
traces:
|
||||
receivers: [otlp]
|
||||
processors: [batch]
|
||||
exporters: [otlphttp/tempo,otlphttp/jaeger,otlp,zipkin]
|
||||
# exporters: [otlphttp/jaeger,zipkin]
|
||||
logs:
|
||||
receivers: [otlp]
|
||||
exporters: [loki]
|
|
@ -0,0 +1,47 @@
|
|||
apiVersion: 1
|
||||
|
||||
datasources:
|
||||
- name: Prometheus
|
||||
type: prometheus
|
||||
uid: prometheus
|
||||
access: proxy
|
||||
orgId: 1
|
||||
url: http://prometheus:9090
|
||||
basicAuth: false
|
||||
isDefault: false
|
||||
version: 1
|
||||
editable: false
|
||||
jsonData:
|
||||
httpMethod: GET
|
||||
- name: Loki
|
||||
type: loki
|
||||
uid: loki
|
||||
access: proxy
|
||||
orgId: 1
|
||||
url: http://loki:3100
|
||||
basicAuth: false
|
||||
isDefault: false
|
||||
version: 1
|
||||
editable: false
|
||||
- name: Tempo
|
||||
type: tempo
|
||||
access: proxy
|
||||
orgId: 1
|
||||
url: http://tempo:3200
|
||||
basicAuth: false
|
||||
isDefault: true
|
||||
version: 1
|
||||
editable: false
|
||||
apiVersion: 1
|
||||
uid: tempo
|
||||
jsonData:
|
||||
httpMethod: GET
|
||||
serviceMap:
|
||||
datasourceUid: prometheus
|
||||
tracesToLogsV2:
|
||||
datasourceUid: loki
|
||||
spanStartTimeShift: '-1h'
|
||||
spanEndTimeShift: '1h'
|
||||
filterByTraceID: true
|
||||
filterBySpanID: true
|
||||
tags: [ { key: 'service.name', value: 'job' } ]
|
|
@ -0,0 +1,15 @@
|
|||
global:
|
||||
scrape_interval: 10s
|
||||
evaluation_interval: 10s
|
||||
|
||||
scrape_configs:
|
||||
- job_name: 'otel-collector'
|
||||
static_configs:
|
||||
- targets: ['otel-collector:8889']
|
||||
|
||||
|
||||
# For direct metrics collection from service to Prometheus
|
||||
# - job_name: 'cloudserver-service-app'
|
||||
# metrics_path: /metrics
|
||||
# static_configs:
|
||||
# - targets: ['host.docker.internal:8002']
|
|
@ -0,0 +1,46 @@
|
|||
server:
|
||||
http_listen_port: 3200
|
||||
|
||||
distributor:
|
||||
receivers: # this configuration will listen on all ports and protocols that tempo is capable of.
|
||||
jaeger: # the receives all come from the OpenTelemetry collector. more configuration information can
|
||||
protocols: # be found there: https://github.com/open-telemetry/opentelemetry-collector/tree/main/receiver
|
||||
thrift_http: #
|
||||
grpc: # for a production deployment you should only enable the receivers you need!
|
||||
thrift_binary:
|
||||
thrift_compact:
|
||||
zipkin:
|
||||
otlp:
|
||||
protocols:
|
||||
http:
|
||||
grpc:
|
||||
opencensus:
|
||||
|
||||
ingester:
|
||||
max_block_duration: 5m # cut the headblock when this much time passes. this is being set for demo purposes and should probably be left alone normally
|
||||
|
||||
compactor:
|
||||
compaction:
|
||||
block_retention: 1h # overall Tempo trace retention. set for demo purposes
|
||||
|
||||
metrics_generator:
|
||||
registry:
|
||||
external_labels:
|
||||
source: tempo
|
||||
cluster: docker-compose
|
||||
storage:
|
||||
path: /tmp/tempo/generator/wal
|
||||
remote_write:
|
||||
- url: http://prometheus:9090/api/v1/write
|
||||
send_exemplars: true
|
||||
|
||||
storage:
|
||||
trace:
|
||||
backend: local # backend configuration to use
|
||||
wal:
|
||||
path: /tmp/tempo/wal # where to store the the wal locally
|
||||
local:
|
||||
path: /tmp/tempo/blocks
|
||||
|
||||
overrides:
|
||||
metrics_generator_processors: [service-graphs, span-metrics] # enables metrics generator
|
|
@ -0,0 +1,72 @@
|
|||
const opentelemetry = require('@opentelemetry/sdk-node');
|
||||
const { WebTracerProvider } = require('@opentelemetry/sdk-trace-web');
|
||||
const { Resource } = require('@opentelemetry/resources');
|
||||
const { PeriodicExportingMetricReader } = require('@opentelemetry/sdk-metrics');
|
||||
const { getNodeAutoInstrumentations } = require('@opentelemetry/auto-instrumentations-node');
|
||||
const { OTLPMetricExporter } = require('@opentelemetry/exporter-metrics-otlp-proto');
|
||||
const { OTLPTraceExporter } = require('@opentelemetry/exporter-trace-otlp-proto');
|
||||
const { BatchSpanProcessor } = require('@opentelemetry/sdk-trace-base');
|
||||
const { IORedisInstrumentation } = require('@opentelemetry/instrumentation-ioredis');
|
||||
|
||||
const {
|
||||
SEMRESATTRS_SERVICE_NAME,
|
||||
SEMRESATTRS_SERVICE_VERSION,
|
||||
} = require('@opentelemetry/semantic-conventions');
|
||||
|
||||
// Define resource with service name and version
|
||||
const resource = new Resource({
|
||||
[SEMRESATTRS_SERVICE_NAME]: process.env.OTEL_SERVICE_NAME || 'cloudserver-service',
|
||||
[SEMRESATTRS_SERVICE_VERSION]: '7.70.47',
|
||||
});
|
||||
|
||||
// OTLP Trace Exporter configuration
|
||||
const traceExporter = new OTLPTraceExporter({
|
||||
url: `http://${process.env.OPENTLEMETRY_COLLECTOR_HOST || 'localhost'}:${process.env.OPENTLEMETRY_COLLECTOR_PORT || 4318}/v1/traces`,
|
||||
headers: {},
|
||||
});
|
||||
|
||||
|
||||
// Metric Reader configuration
|
||||
const metricReader = new PeriodicExportingMetricReader({
|
||||
exporter: new OTLPMetricExporter({
|
||||
url: `http://${process.env.OPENTLEMETRY_COLLECTOR_HOST || 'localhost'}:${process.env.OPENTLEMETRY_COLLECTOR_PORT || 4318}/v1/metrics`,
|
||||
headers: {},
|
||||
concurrencyLimit: 1,
|
||||
}),
|
||||
});
|
||||
|
||||
// Node SDK configuration
|
||||
const sdk = new opentelemetry.NodeSDK({
|
||||
traceExporter,
|
||||
resource,
|
||||
metricReader,
|
||||
instrumentations: [
|
||||
getNodeAutoInstrumentations({
|
||||
'@opentelemetry/instrumentation-fs': {
|
||||
enabled: false,
|
||||
},
|
||||
'@opentelemetry/instrumentation-http': {
|
||||
responseHook: (span, operations) => {
|
||||
span.updateName(`${operations.req.protocol} ${operations.req.method} ${operations.req.path.split('&')[0]}`);
|
||||
},
|
||||
},
|
||||
}),
|
||||
new IORedisInstrumentation({
|
||||
requestHook: (span, { cmdName, cmdArgs }) => {
|
||||
span.updateName(`Redis:: ${cmdName.toUpperCase()} cache operation for ${cmdArgs[0].split(':')[0]}`);
|
||||
},
|
||||
// see under for available configuration
|
||||
}),
|
||||
],
|
||||
});
|
||||
|
||||
// Additional WebTracerProvider configuration
|
||||
// This will initialize TracerProvider that will let us create a Tracers
|
||||
const webTracerProvider = new WebTracerProvider({ resource });
|
||||
const webSpanProcessor = new BatchSpanProcessor(traceExporter);
|
||||
webTracerProvider.addSpanProcessor(webSpanProcessor);
|
||||
webTracerProvider.register();
|
||||
|
||||
// Start the Node SDK
|
||||
sdk.start();
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
const { auth, errors, policies } = require('arsenal');
|
||||
const async = require('async');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const bucketDelete = require('./bucketDelete');
|
||||
const bucketDeleteCors = require('./bucketDeleteCors');
|
||||
|
@ -75,7 +76,12 @@ auth.setHandler(vault);
|
|||
|
||||
/* eslint-disable no-param-reassign */
|
||||
const api = {
|
||||
callApiMethod(apiMethod, request, response, log, callback) {
|
||||
callApiMethod(apiMethod, request, response, log, callback, tracer) {
|
||||
return tracer.startActiveSpan('Using Cloudserver to processing API related operations', methodSpan => {
|
||||
methodSpan.addEvent('Cloudserver::callApiMethod() Processing API related operations');
|
||||
methodSpan.setAttribute('code.function', 'callApiMethod()');
|
||||
methodSpan.setAttribute('code.filepath', 'lib/api/api.js');
|
||||
methodSpan.setAttribute('code.lineno', 84);
|
||||
// Attach the apiMethod method to the request, so it can used by monitoring in the server
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
request.apiMethod = apiMethod;
|
||||
|
@ -186,12 +192,25 @@ const api = {
|
|||
}
|
||||
|
||||
return async.waterfall([
|
||||
next => auth.server.doAuth(
|
||||
next => tracer.startActiveSpan('Authentication of user against IAM', authUserSpan => {
|
||||
authUserSpan.setAttribute('code.function', 'auth.server.doAuth()');
|
||||
authUserSpan.setAttribute('code.filepath', 'lib/api/api.js');
|
||||
authUserSpan.setAttribute('code.lineno', 197);
|
||||
return next(null, authUserSpan);
|
||||
}),
|
||||
(authSpan, next) => auth.server.doAuth(
|
||||
request, log, (err, userInfo, authorizationResults, streamingV4Params) => {
|
||||
if (err) {
|
||||
log.trace('authentication error', { error: err });
|
||||
authSpan.end();
|
||||
return next(err);
|
||||
}
|
||||
const authNames = { accountName: userInfo.getAccountDisplayName() };
|
||||
authSpan.setAttribute('user.accountName', authNames.accountName);
|
||||
if (authNames.userName) {
|
||||
authSpan.setAttribute('user.userName', authNames.userName);
|
||||
}
|
||||
authSpan.end();
|
||||
return next(null, userInfo, authorizationResults, streamingV4Params);
|
||||
}, 's3', requestContexts),
|
||||
(userInfo, authorizationResults, streamingV4Params, next) => {
|
||||
|
@ -216,7 +235,6 @@ const api = {
|
|||
post.push(chunk);
|
||||
}
|
||||
});
|
||||
|
||||
request.on('error', err => {
|
||||
log.trace('error receiving request', {
|
||||
error: err,
|
||||
|
@ -244,6 +262,7 @@ const api = {
|
|||
apiMethod,
|
||||
log,
|
||||
(err, authResultsWithTags) => {
|
||||
methodSpan.addEvent('Authentication of user completed');
|
||||
if (err) {
|
||||
log.trace('tag authentication error', { error: err });
|
||||
return next(err);
|
||||
|
@ -271,19 +290,35 @@ const api = {
|
|||
return acc;
|
||||
}, {});
|
||||
}
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
methodSpan,
|
||||
);
|
||||
const apiSpan = tracer.startSpan(`API operation for ${apiMethod}`, undefined, ctx);
|
||||
if (apiMethod === 'objectPut' || apiMethod === 'objectPutPart') {
|
||||
request._response = response;
|
||||
return this[apiMethod](userInfo, request, streamingV4Params,
|
||||
log, callback, authorizationResults);
|
||||
log, callback, authorizationResults, apiSpan, methodSpan);
|
||||
}
|
||||
if (apiMethod === 'objectCopy' || apiMethod === 'objectPutCopyPart') {
|
||||
return this[apiMethod](userInfo, request, sourceBucket,
|
||||
sourceObject, sourceVersionId, log, callback);
|
||||
sourceObject, sourceVersionId, log, callback, apiSpan);
|
||||
}
|
||||
if (apiMethod === 'objectGet') {
|
||||
return this[apiMethod](userInfo, request, returnTagCount, log, callback);
|
||||
return this[apiMethod](userInfo, request, returnTagCount, log, callback, apiSpan, methodSpan);
|
||||
}
|
||||
return this[apiMethod](userInfo, request, log, callback);
|
||||
return this[apiMethod](userInfo, request, log, (err, res, cosrsHeaders) => {
|
||||
methodSpan.addEvent('Cloudserver::callApiMethod() API operation completed, sending response to client');
|
||||
apiSpan.end();
|
||||
return process.nextTick(() => {
|
||||
methodSpan.end();
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
return callback(null, res, cosrsHeaders);
|
||||
});
|
||||
}, apiSpan);
|
||||
});
|
||||
});
|
||||
},
|
||||
bucketDelete,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
const async = require('async');
|
||||
const { errors, s3middleware } = require('arsenal');
|
||||
const getMetaHeaders = s3middleware.userMetadata.getMetaHeaders;
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const constants = require('../../../../constants');
|
||||
const { data } = require('../../../data/wrapper');
|
||||
|
@ -46,8 +47,30 @@ function _checkAndApplyZenkoMD(metaHeaders, request, isDeleteMarker) {
|
|||
}
|
||||
|
||||
function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
||||
metadataStoreParams, dataToDelete, log, requestMethod, callback) {
|
||||
services.metadataStoreObject(bucketName, dataGetInfo,
|
||||
metadataStoreParams, dataToDelete, log, requestMethod, callback, ctx, tracer) {
|
||||
if (tracer) {
|
||||
return tracer.startActiveSpan('Updating metadata', undefined, ctx, metadataSpan => {
|
||||
return services.metadataStoreObject(bucketName, dataGetInfo,
|
||||
cipherBundle, metadataStoreParams, (err, result) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
}
|
||||
if (dataToDelete) {
|
||||
const newDataStoreName = Array.isArray(dataGetInfo) ?
|
||||
dataGetInfo[0].dataStoreName : null;
|
||||
return tracer.startActiveSpan('Deleting relevant metadadata', undefined, ctx, deleteMetadataSpan => {
|
||||
return data.batchDelete(dataToDelete, requestMethod,
|
||||
newDataStoreName, log, err => {
|
||||
deleteMetadataSpan.end();
|
||||
return callback(err, result, metadataSpan);
|
||||
});
|
||||
});
|
||||
}
|
||||
return callback(null, result);
|
||||
});
|
||||
});
|
||||
}
|
||||
return services.metadataStoreObject(bucketName, dataGetInfo,
|
||||
cipherBundle, metadataStoreParams, (err, result) => {
|
||||
if (err) {
|
||||
return callback(err);
|
||||
|
@ -85,7 +108,8 @@ function _storeInMDandDeleteData(bucketName, dataGetInfo, cipherBundle,
|
|||
*/
|
||||
function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
||||
canonicalID, cipherBundle, request, isDeleteMarker, streamingV4Params,
|
||||
overheadField, log, callback) {
|
||||
overheadField, log, callback, apiSpan, callAPIMethodSpan) {
|
||||
apiSpan.addEvent('preparing Object data and Metadata create object');
|
||||
const size = isDeleteMarker ? 0 : request.parsedContentLength;
|
||||
// although the request method may actually be 'DELETE' if creating a
|
||||
// delete marker, for our purposes we consider this to be a 'PUT'
|
||||
|
@ -203,17 +227,29 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
|||
/* eslint-disable camelcase */
|
||||
const dontSkipBackend = externalBackends;
|
||||
/* eslint-enable camelcase */
|
||||
|
||||
apiSpan.addEvent('object data and metadata prepared, starting storage procedure');
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
apiSpan,
|
||||
);
|
||||
return async.waterfall([
|
||||
function storeData(next) {
|
||||
// const datastoreSpan = tracer.startSpan('Storing Data using sproxyd', undefined, ctx);
|
||||
if (size === 0 && !dontSkipBackend[locationType]) {
|
||||
metadataStoreParams.contentMD5 = constants.emptyFileMd5;
|
||||
return next(null, null, null);
|
||||
}
|
||||
return dataStore(objectKeyContext, cipherBundle, request, size,
|
||||
streamingV4Params, backendInfo, log, next);
|
||||
streamingV4Params, backendInfo, log, (err, dataGetInfo, calculatedHash, dataStoreSpan) => {
|
||||
if (dataStoreSpan) {
|
||||
dataStoreSpan.end();
|
||||
}
|
||||
return next(err, dataGetInfo, calculatedHash);
|
||||
}, ctx, tracer);
|
||||
},
|
||||
function processDataResult(dataGetInfo, calculatedHash, next) {
|
||||
const processDataSpan = tracer.startSpan('Processing data stored for generating MD', undefined, ctx);
|
||||
if (dataGetInfo === null || dataGetInfo === undefined) {
|
||||
return next(null, null);
|
||||
}
|
||||
|
@ -234,7 +270,10 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
|||
cipherBundle.cipheredDataKey;
|
||||
}
|
||||
metadataStoreParams.contentMD5 = calculatedHash;
|
||||
return process.nextTick(() => {
|
||||
processDataSpan.end();
|
||||
return next(null, dataGetInfoArr);
|
||||
});
|
||||
},
|
||||
function getVersioningInfo(infoArr, next) {
|
||||
return versioningPreprocessing(bucketName, bucketMD,
|
||||
|
@ -262,7 +301,12 @@ function createAndStoreObject(bucketName, bucketMD, objectKey, objMD, authInfo,
|
|||
}
|
||||
return _storeInMDandDeleteData(bucketName, infoArr,
|
||||
cipherBundle, metadataStoreParams,
|
||||
options.dataToDelete, log, requestMethod, next);
|
||||
options.dataToDelete, log, requestMethod, (err, result, metadataSpan) => {
|
||||
if (metadataSpan) {
|
||||
metadataSpan.end();
|
||||
}
|
||||
return next(err, result);
|
||||
}, ctx, tracer);
|
||||
},
|
||||
], callback);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ const { prepareStream } = require('./prepareStream');
|
|||
* @return {function} - calls callback with arguments:
|
||||
* error, dataRetrievalInfo, and completedHash (if any)
|
||||
*/
|
||||
function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
||||
function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb, dataStoreSpan) {
|
||||
const contentMD5 = stream.contentMD5;
|
||||
const completedHash = hashedStream.completedHash;
|
||||
if (contentMD5 && completedHash && contentMD5 !== completedHash) {
|
||||
|
@ -36,7 +36,7 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
|||
return cb(errors.BadDigest);
|
||||
});
|
||||
}
|
||||
return cb(null, dataRetrievalInfo, completedHash);
|
||||
return cb(null, dataRetrievalInfo, completedHash, dataStoreSpan);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -56,12 +56,37 @@ function checkHashMatchMD5(stream, hashedStream, dataRetrievalInfo, log, cb) {
|
|||
* @return {undefined}
|
||||
*/
|
||||
function dataStore(objectContext, cipherBundle, stream, size,
|
||||
streamingV4Params, backendInfo, log, cb) {
|
||||
streamingV4Params, backendInfo, log, cb, ctx, tracer) {
|
||||
const cbOnce = jsutil.once(cb);
|
||||
const dataStream = prepareStream(stream, streamingV4Params, log, cbOnce);
|
||||
if (!dataStream) {
|
||||
return process.nextTick(() => cb(errors.InvalidArgument));
|
||||
}
|
||||
if (tracer) {
|
||||
return tracer.startActiveSpan('Storing Data using sproxyd', undefined, ctx, dataStoreSpan => {
|
||||
return data.put(
|
||||
cipherBundle, dataStream, size, objectContext, backendInfo, log,
|
||||
(err, dataRetrievalInfo, hashedStream) => {
|
||||
if (err) {
|
||||
log.error('error in datastore', {
|
||||
error: err,
|
||||
});
|
||||
return cbOnce(err);
|
||||
}
|
||||
if (!dataRetrievalInfo) {
|
||||
log.fatal('data put returned neither an error nor a key', {
|
||||
method: 'storeObject::dataStore',
|
||||
});
|
||||
return cbOnce(errors.InternalError);
|
||||
}
|
||||
log.trace('dataStore: backend stored key', {
|
||||
dataRetrievalInfo,
|
||||
});
|
||||
return checkHashMatchMD5(stream, hashedStream,
|
||||
dataRetrievalInfo, log, cbOnce, dataStoreSpan);
|
||||
});
|
||||
});
|
||||
}
|
||||
return data.put(
|
||||
cipherBundle, dataStream, size, objectContext, backendInfo, log,
|
||||
(err, dataRetrievalInfo, hashedStream) => {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const querystring = require('querystring');
|
||||
const { errors, versioning, s3middleware } = require('arsenal');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const constants = require('../../constants');
|
||||
const services = require('../services');
|
||||
|
@ -261,7 +262,12 @@ function processMasterVersions(bucketName, listParams, list) {
|
|||
}
|
||||
|
||||
function handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
||||
bucketName, list, corsHeaders, log, callback) {
|
||||
bucketName, list, corsHeaders, log, callback, traceContext) {
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
const handleResultSpan = tracer.startSpan('Preparing result', undefined, traceContext);
|
||||
handleResultSpan.setAttribute('code.function', 'handleResult()');
|
||||
handleResultSpan.setAttribute('code.filepath', 'lib/api/bucketGet.js');
|
||||
handleResultSpan.setAttribute('code.lineno', 270);
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
listParams.maxKeys = requestMaxKeys;
|
||||
// eslint-disable-next-line no-param-reassign
|
||||
|
@ -275,6 +281,7 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
|||
}
|
||||
pushMetric('listBucket', log, { authInfo, bucket: bucketName });
|
||||
monitoring.promMetrics('GET', bucketName, '200', 'listBucket');
|
||||
handleResultSpan.end();
|
||||
return callback(null, res, corsHeaders);
|
||||
}
|
||||
|
||||
|
@ -288,7 +295,16 @@ function handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
|||
* with either error code or xml response body
|
||||
* @return {undefined}
|
||||
*/
|
||||
function bucketGet(authInfo, request, log, callback) {
|
||||
function bucketGet(authInfo, request, log, callback, parentSpan) {
|
||||
parentSpan.addEvent('Cloudserver::bucketGet() processing bucketGet request');
|
||||
parentSpan.setAttribute('code.function', 'bucketGet()');
|
||||
parentSpan.setAttribute('code.filepath', 'lib/api/bucketGet.js');
|
||||
parentSpan.setAttribute('code.lineno', 299);
|
||||
// const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
parentSpan,
|
||||
);
|
||||
const params = request.query;
|
||||
const bucketName = request.bucketName;
|
||||
const v2 = params['list-type'];
|
||||
|
@ -350,7 +366,7 @@ function bucketGet(authInfo, request, log, callback) {
|
|||
} else {
|
||||
listParams.marker = params.marker;
|
||||
}
|
||||
|
||||
// parentSpan.addEvent('Cloudserver::bucketGet() validating Metadata');
|
||||
standardMetadataValidateBucket(metadataValParams, request.actionImplicitDenies, log, (err, bucket) => {
|
||||
const corsHeaders = collectCorsHeaders(request.headers.origin,
|
||||
request.method, bucket);
|
||||
|
@ -367,7 +383,9 @@ function bucketGet(authInfo, request, log, callback) {
|
|||
listParams.versionIdMarker = params['version-id-marker'] ?
|
||||
versionIdUtils.decode(params['version-id-marker']) : undefined;
|
||||
}
|
||||
// parentSpan.addEvent('Cloudserver::bucketGet() Getting object listing');
|
||||
if (!requestMaxKeys) {
|
||||
parentSpan.addEvent('Cloudserver::bucketGet() Empty bucket');
|
||||
const emptyList = {
|
||||
CommonPrefixes: [],
|
||||
Contents: [],
|
||||
|
@ -375,7 +393,7 @@ function bucketGet(authInfo, request, log, callback) {
|
|||
IsTruncated: false,
|
||||
};
|
||||
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
||||
bucketName, emptyList, corsHeaders, log, callback);
|
||||
bucketName, emptyList, corsHeaders, log, callback, ctx);
|
||||
}
|
||||
return services.getObjectListing(bucketName, listParams, log,
|
||||
(err, list) => {
|
||||
|
@ -386,9 +404,9 @@ function bucketGet(authInfo, request, log, callback) {
|
|||
return callback(err, null, corsHeaders);
|
||||
}
|
||||
return handleResult(listParams, requestMaxKeys, encoding, authInfo,
|
||||
bucketName, list, corsHeaders, log, callback);
|
||||
});
|
||||
});
|
||||
bucketName, list, corsHeaders, log, callback, ctx);
|
||||
}, ctx);
|
||||
}, ctx);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
|
||||
const { standardMetadataValidateBucket } = require('../metadata/metadataUtils');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const { pushMetric } = require('../utapi/utilities');
|
||||
const monitoring = require('../utilities/metrics');
|
||||
|
@ -13,7 +14,15 @@ const monitoring = require('../utilities/metrics');
|
|||
* with either error code or success
|
||||
* @return {undefined}
|
||||
*/
|
||||
function bucketHead(authInfo, request, log, callback) {
|
||||
function bucketHead(authInfo, request, log, callback, parentSpan) {
|
||||
parentSpan.addEvent('Cloudserver::bucketGet() processing Head Bucket request');
|
||||
parentSpan.setAttribute('code.function', 'bucketHead()');
|
||||
parentSpan.setAttribute('code.filepath', 'lib/api/bucketHead.js');
|
||||
parentSpan.setAttribute('code.lineno', 20);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
parentSpan,
|
||||
);
|
||||
log.debug('processing request', { method: 'bucketHead' });
|
||||
const bucketName = request.bucketName;
|
||||
const metadataValParams = {
|
||||
|
@ -38,7 +47,7 @@ function bucketHead(authInfo, request, log, callback) {
|
|||
'x-amz-bucket-region': bucket.getLocationConstraint(),
|
||||
};
|
||||
return callback(null, Object.assign(corsHeaders, headers));
|
||||
});
|
||||
}, ctx);
|
||||
}
|
||||
|
||||
module.exports = bucketHead;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
/* eslint-disable indent */
|
||||
const async = require('async');
|
||||
const { errors, versioning } = require('arsenal');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
|
||||
const services = require('../services');
|
||||
|
@ -28,7 +29,16 @@ const { overheadField } = require('../../constants');
|
|||
* @param {function} cb - final cb to call with the result and response headers
|
||||
* @return {undefined}
|
||||
*/
|
||||
function objectDelete(authInfo, request, log, cb) {
|
||||
function objectDelete(authInfo, request, log, cb, apiSpan) {
|
||||
apiSpan.addEvent('Cloudserver::objectGet() processing objectPut request');
|
||||
apiSpan.setAttribute('code.function', 'objectPut()');
|
||||
apiSpan.setAttribute('code.filepath', 'lib/api/objectPut.js');
|
||||
apiSpan.setAttribute('code.lineno', 50);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
apiSpan,
|
||||
);
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
log.debug('processing request', { method: 'objectDelete' });
|
||||
if (authInfo.isRequesterPublicUser()) {
|
||||
log.debug('operation not available for public user');
|
||||
|
@ -97,9 +107,10 @@ function objectDelete(authInfo, request, log, cb) {
|
|||
});
|
||||
}
|
||||
return next(null, bucketMD, objMD);
|
||||
});
|
||||
}, ctx);
|
||||
},
|
||||
function checkGovernanceBypassHeader(bucketMD, objectMD, next) {
|
||||
apiSpan.addEvent('Check governance bypass header');
|
||||
// AWS only returns an object lock error if a version id
|
||||
// is specified, else continue to create a delete marker
|
||||
if (!reqVersionId) {
|
||||
|
@ -112,12 +123,15 @@ function objectDelete(authInfo, request, log, cb) {
|
|||
log.debug('user does not have BypassGovernanceRetention and object is locked');
|
||||
return next(err, bucketMD);
|
||||
}
|
||||
apiSpan.addEvent('Checked governance bypass header');
|
||||
return next(null, hasGovernanceBypass, bucketMD, objectMD);
|
||||
});
|
||||
}
|
||||
apiSpan.addEvent('Checked governance bypass header');
|
||||
return next(null, hasGovernanceBypass, bucketMD, objectMD);
|
||||
},
|
||||
function evaluateObjectLockPolicy(hasGovernanceBypass, bucketMD, objectMD, next) {
|
||||
apiSpan.addEvent('Evaluate object lock policy');
|
||||
// AWS only returns an object lock error if a version id
|
||||
// is specified, else continue to create a delete marker
|
||||
if (!reqVersionId) {
|
||||
|
@ -135,7 +149,7 @@ function objectDelete(authInfo, request, log, cb) {
|
|||
log.debug('trying to delete locked object');
|
||||
return next(objectLockedError, bucketMD);
|
||||
}
|
||||
|
||||
apiSpan.addEvent('Object lock policy evaluation done');
|
||||
return next(null, bucketMD, objectMD);
|
||||
},
|
||||
function deleteOperation(bucketMD, objectMD, next) {
|
||||
|
@ -161,10 +175,14 @@ function objectDelete(authInfo, request, log, cb) {
|
|||
if (!_bucketRequiresOplogUpdate(bucketMD)) {
|
||||
delOptions.doesNotNeedOpogUpdate = true;
|
||||
}
|
||||
|
||||
// return tracer.startActiveSpan('Deleting object', undefined, ctx, deleteObjectSpan => services.deleteObject(
|
||||
// bucketName, objectMD, objectKey, delOptions, false, log, (err, delResult) => {
|
||||
// deleteObjectSpan.end();
|
||||
// return next(err, bucketMD, objectMD, delResult, deleteInfo);
|
||||
// }));
|
||||
return services.deleteObject(bucketName, objectMD, objectKey,
|
||||
delOptions, false, log, (err, delResult) => next(err, bucketMD,
|
||||
objectMD, delResult, deleteInfo));
|
||||
objectMD, delResult, deleteInfo), ctx);
|
||||
}
|
||||
// putting a new delete marker
|
||||
deleteInfo.newDeleteMarker = true;
|
||||
|
@ -257,6 +275,7 @@ function objectDelete(authInfo, request, log, cb) {
|
|||
monitoring.promMetrics('DELETE', bucketName, '200', 'deleteObject',
|
||||
Number.parseInt(objectMD['content-length'], 10));
|
||||
}
|
||||
apiSpan.end();
|
||||
return cb(err, resHeaders);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const { errors, s3middleware } = require('arsenal');
|
||||
const { parseRange } = require('arsenal').network.http.utils;
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const { data } = require('../data/wrapper');
|
||||
|
||||
|
@ -25,7 +26,15 @@ const validateHeaders = s3middleware.validateConditionalHeaders;
|
|||
* @param {function} callback - callback to function in route
|
||||
* @return {undefined}
|
||||
*/
|
||||
function objectGet(authInfo, request, returnTagCount, log, callback) {
|
||||
function objectGet(authInfo, request, returnTagCount, log, callback, apiSpan, callAPIMethodSpan) {
|
||||
apiSpan.addEvent('Cloudserver::objectGet() processing objectGet request');
|
||||
apiSpan.setAttribute('code.function', 'objectGet()');
|
||||
apiSpan.setAttribute('code.filepath', 'lib/api/objectGet.js');
|
||||
apiSpan.setAttribute('code.lineno', 32);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
apiSpan,
|
||||
);
|
||||
log.debug('processing request', { method: 'objectGet' });
|
||||
const bucketName = request.bucketName;
|
||||
const objectKey = request.objectKey;
|
||||
|
@ -64,6 +73,7 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
|
|||
return callback(err, null, corsHeaders);
|
||||
}
|
||||
if (!objMD) {
|
||||
apiSpan.addEvent(versionId ? 'Cloudserver::objectGet() Object version not found' : 'Cloudserver::objectGet() Object not found');
|
||||
const err = versionId ? errors.NoSuchVersion : errors.NoSuchKey;
|
||||
monitoring.promMetrics(
|
||||
'GET', bucketName, err.code, 'getObject');
|
||||
|
@ -87,6 +97,7 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
|
|||
return callback(errors.MethodNotAllowed, null,
|
||||
responseMetaHeaders);
|
||||
}
|
||||
apiSpan.addEvent('Cloudserver::bucketGet() Validating API request headers against object metadata');
|
||||
const headerValResult = validateHeaders(request.headers,
|
||||
objMD['last-modified'], objMD['content-md5']);
|
||||
if (headerValResult.error) {
|
||||
|
@ -233,7 +244,9 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
|
|||
dataLocator = setPartRanges(dataLocator, byteRange);
|
||||
}
|
||||
}
|
||||
apiSpan.addEvent('Cloudserver::bucketGet() Trying to get data location using head request');
|
||||
return data.head(dataLocator, log, err => {
|
||||
apiSpan.addEvent('Cloudserver::bucketGet() Data location found');
|
||||
if (err) {
|
||||
log.error('error from external backend checking for ' +
|
||||
'object existence', { error: err });
|
||||
|
@ -253,9 +266,9 @@ function objectGet(authInfo, request, returnTagCount, log, callback) {
|
|||
monitoring.promMetrics('GET', bucketName, '200', 'getObject',
|
||||
Number.parseInt(responseMetaHeaders['Content-Length'], 10));
|
||||
return callback(null, dataLocator, responseMetaHeaders,
|
||||
byteRange);
|
||||
});
|
||||
byteRange, apiSpan, callAPIMethodSpan);
|
||||
});
|
||||
}, ctx);
|
||||
}
|
||||
|
||||
module.exports = objectGet;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
const { errors, s3middleware } = require('arsenal');
|
||||
const validateHeaders = s3middleware.validateConditionalHeaders;
|
||||
const { parseRange } = require('arsenal').network.http.utils;
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const { decodeVersionId } = require('./apiUtils/object/versioning');
|
||||
const collectCorsHeaders = require('../utilities/collectCorsHeaders');
|
||||
|
@ -25,7 +26,15 @@ const { setExpirationHeaders } = require('./apiUtils/object/expirationHeaders');
|
|||
* @return {undefined}
|
||||
*
|
||||
*/
|
||||
function objectHead(authInfo, request, log, callback) {
|
||||
function objectHead(authInfo, request, log, callback, parentSpan) {
|
||||
parentSpan.addEvent('Cloudserver::bucketGet() processing Head Object request');
|
||||
parentSpan.setAttribute('code.function', 'objectHead()');
|
||||
parentSpan.setAttribute('code.filepath', 'lib/api/objectHead.js');
|
||||
parentSpan.setAttribute('code.lineno', 32);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
parentSpan,
|
||||
);
|
||||
log.debug('processing request', { method: 'objectHead' });
|
||||
const bucketName = request.bucketName;
|
||||
const objectKey = request.objectKey;
|
||||
|
@ -160,7 +169,7 @@ function objectHead(authInfo, request, log, callback) {
|
|||
});
|
||||
monitoring.promMetrics('HEAD', bucketName, '200', 'headObject');
|
||||
return callback(null, responseHeaders);
|
||||
});
|
||||
}, ctx);
|
||||
}
|
||||
|
||||
module.exports = objectHead;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const async = require('async');
|
||||
const { errors, versioning } = require('arsenal');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const aclUtils = require('../utilities/aclUtils');
|
||||
const { cleanUpBucket } = require('./apiUtils/bucket/bucketCreation');
|
||||
|
@ -39,9 +40,19 @@ const versionIdUtils = versioning.VersionID;
|
|||
* (to be used for streaming v4 auth if applicable)
|
||||
* @param {object} log - the log request
|
||||
* @param {Function} callback - final callback to call with the result
|
||||
* @param {object} apiSpan - APi span
|
||||
* @param {object} methodSpan - method span
|
||||
* @return {undefined}
|
||||
*/
|
||||
function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
||||
function objectPut(authInfo, request, streamingV4Params, log, callback, authorizationResults, apiSpan, methodSpan) {
|
||||
apiSpan.addEvent('Cloudserver::objectGet() processing objectPut request');
|
||||
apiSpan.setAttribute('code.function', 'objectPut()');
|
||||
apiSpan.setAttribute('code.filepath', 'lib/api/objectPut.js');
|
||||
apiSpan.setAttribute('code.lineno', 50);
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
apiSpan,
|
||||
);
|
||||
log.debug('processing request', { method: 'objectPut' });
|
||||
const {
|
||||
bucketName,
|
||||
|
@ -101,12 +112,14 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
|||
|
||||
return async.waterfall([
|
||||
function handleTransientOrDeleteBuckets(next) {
|
||||
apiSpan.addEvent('Cloudserver:: handleTransientOrDeleteBuckets');
|
||||
if (bucket.hasTransientFlag() || bucket.hasDeletedFlag()) {
|
||||
return cleanUpBucket(bucket, canonicalID, log, next);
|
||||
}
|
||||
return next();
|
||||
},
|
||||
function getSSEConfig(next) {
|
||||
apiSpan.addEvent('Cloudserver:: handleTransientOrDeleteBuckets');
|
||||
return getObjectSSEConfiguration(headers, bucket, log,
|
||||
(err, sseConfig) => {
|
||||
if (err) {
|
||||
|
@ -125,17 +138,20 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
|||
return next(null, null);
|
||||
},
|
||||
function objectCreateAndStore(cipherBundle, next) {
|
||||
apiSpan.addEvent('Cloudserver:: handleTransientOrDeleteBuckets');
|
||||
const objectLockValidationError
|
||||
= validateHeaders(bucket, headers, log);
|
||||
if (objectLockValidationError) {
|
||||
return next(objectLockValidationError);
|
||||
}
|
||||
writeContinue(request, request._response);
|
||||
apiSpan.addEvent('Cloudserver:: creating and storing object');
|
||||
return createAndStoreObject(bucketName,
|
||||
bucket, objectKey, objMD, authInfo, canonicalID, cipherBundle,
|
||||
request, false, streamingV4Params, overheadField, log, next);
|
||||
request, false, streamingV4Params, overheadField, log, next, apiSpan, methodSpan);
|
||||
},
|
||||
], (err, storingResult) => {
|
||||
apiSpan.addEvent('Cloudserver:: stored object metadata and data');
|
||||
if (err) {
|
||||
monitoring.promMetrics('PUT', bucketName, err.code,
|
||||
'putObject');
|
||||
|
@ -150,7 +166,7 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
|||
&& !Number.isNaN(request.headers['x-amz-meta-size']))
|
||||
? Number.parseInt(request.headers['x-amz-meta-size'], 10) : null;
|
||||
const newByteLength = parsedContentLength;
|
||||
|
||||
apiSpan.addEvent('Cloudserver:: preparing response headers');
|
||||
setExpirationHeaders(responseHeaders, {
|
||||
lifecycleConfig: bucket.getLifecycleConfiguration(),
|
||||
objectParams: {
|
||||
|
@ -199,12 +215,17 @@ function objectPut(authInfo, request, streamingV4Params, log, callback) {
|
|||
location: bucket.getLocationConstraint(),
|
||||
numberOfObjects,
|
||||
});
|
||||
apiSpan.end();
|
||||
monitoring.promMetrics('PUT', bucketName, '200',
|
||||
'putObject', newByteLength, oldByteLength, isVersionedObj,
|
||||
null, ingestSize);
|
||||
return process.nextTick(() => {
|
||||
methodSpan.addEvent('responding to client');
|
||||
methodSpan.end();
|
||||
return callback(null, responseHeaders);
|
||||
});
|
||||
});
|
||||
}, ctx);
|
||||
}
|
||||
|
||||
module.exports = objectPut;
|
||||
|
|
|
@ -51,8 +51,11 @@ function generateXml(xml, owner, userBuckets, splitter) {
|
|||
* @param {function} callback - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
function serviceGet(authInfo, request, log, callback) {
|
||||
function serviceGet(authInfo, request, log, callback, parentSpan) {
|
||||
log.debug('processing request', { method: 'serviceGet' });
|
||||
parentSpan.setAttribute('code.function', 'serviceGet()');
|
||||
parentSpan.setAttribute('code.filepath', 'lib/api/serviceGet.js');
|
||||
parentSpan.setAttribute('code.lineno', 58);
|
||||
|
||||
if (authInfo.isRequesterPublicUser()) {
|
||||
log.debug('operation not available for public user');
|
||||
|
@ -73,6 +76,7 @@ function serviceGet(authInfo, request, log, callback) {
|
|||
'</Owner>',
|
||||
'<Buckets>'
|
||||
);
|
||||
parentSpan.addEvent('getting metadata');
|
||||
return services.getService(authInfo, request, log, constants.splitter,
|
||||
(err, userBuckets, splitter) => {
|
||||
if (err) {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const async = require('async');
|
||||
const { errors } = require('arsenal');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
|
||||
const metadata = require('./wrapper');
|
||||
const BucketInfo = require('arsenal').models.BucketInfo;
|
||||
|
@ -181,9 +182,15 @@ function validateBucket(bucket, params, log, actionImplicitDenies = {}) {
|
|||
* @param {boolean} actionImplicitDenies - identity authorization results
|
||||
* @param {RequestLogger} log - request logger
|
||||
* @param {function} callback - callback
|
||||
* @param {object} tracerContext - tracing context
|
||||
* @return {undefined} - and call callback with params err, bucket md
|
||||
*/
|
||||
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback) {
|
||||
function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log, callback, tracerContext) {
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
const bucketPolicyAuthSpan = tracer.startSpan('Retrieving Metadata and validating user against bucket policy', undefined, tracerContext);
|
||||
bucketPolicyAuthSpan.setAttribute('code.function', 'standardMetadataValidateBucketAndObj()');
|
||||
bucketPolicyAuthSpan.setAttribute('code.filepath', 'lib/metadata/metadataUtils.js');
|
||||
bucketPolicyAuthSpan.setAttribute('code.lineno', 193);
|
||||
const { authInfo, bucketName, objectKey, versionId, getDeleteMarker, request } = params;
|
||||
let requestType = params.requestType;
|
||||
if (!Array.isArray(requestType)) {
|
||||
|
@ -196,7 +203,15 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
|||
if (getDeleteMarker) {
|
||||
getOptions.getDeleteMarker = true;
|
||||
}
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieving metadata of bucket and object');
|
||||
const ctx = opentelemetry.trace.setSpan(
|
||||
opentelemetry.context.active(),
|
||||
bucketPolicyAuthSpan,
|
||||
);
|
||||
const metadataSpan = tracer.startSpan('Retrieving metadata', undefined, ctx);
|
||||
return metadata.getBucketAndObjectMD(bucketName, objectKey, getOptions, log, (err, getResult) => {
|
||||
metadataSpan.end();
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieved metadata of bucket');
|
||||
if (err) {
|
||||
// if some implicit iamAuthzResults, return AccessDenied
|
||||
// before leaking any state information
|
||||
|
@ -209,6 +224,7 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
|||
});
|
||||
},
|
||||
(getResult, next) => {
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating metadata of bucket and object');
|
||||
const bucket = getResult.bucket ?
|
||||
BucketInfo.deSerialize(getResult.bucket) : undefined;
|
||||
if (!bucket) {
|
||||
|
@ -227,9 +243,11 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
|||
return getNullVersionFromMaster(bucketName, objectKey, log,
|
||||
(err, nullVer) => next(err, bucket, nullVer));
|
||||
}
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: bucket and object metadata validated');
|
||||
return next(null, bucket, objMD);
|
||||
},
|
||||
(bucket, objMD, next) => {
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating user against bucket policy');
|
||||
const canonicalID = authInfo.getCanonicalID();
|
||||
if (!isObjAuthorized(bucket, objMD, requestType, canonicalID, authInfo, log, request,
|
||||
actionImplicitDenies)) {
|
||||
|
@ -239,13 +257,18 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
|||
return next(null, bucket, objMD);
|
||||
},
|
||||
], (err, bucket, objMD) => {
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: user validation done against bucket policy');
|
||||
if (err) {
|
||||
// still return bucket for cors headers
|
||||
return callback(err, bucket);
|
||||
}
|
||||
return process.nextTick(() => {
|
||||
bucketPolicyAuthSpan.end();
|
||||
return callback(null, bucket, objMD);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/** standardMetadataValidateBucket - retrieve bucket from metadata and check if user
|
||||
* is authorized to access it
|
||||
* @param {object} params - function parameters
|
||||
|
@ -256,11 +279,20 @@ function standardMetadataValidateBucketAndObj(params, actionImplicitDenies, log,
|
|||
* @param {boolean} actionImplicitDenies - identity authorization results
|
||||
* @param {RequestLogger} log - request logger
|
||||
* @param {function} callback - callback
|
||||
* @param {object} tracerContext - tracing context
|
||||
* @return {undefined} - and call callback with params err, bucket md
|
||||
*/
|
||||
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback) {
|
||||
function standardMetadataValidateBucket(params, actionImplicitDenies, log, callback, tracerContext) {
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
const bucketPolicyAuthSpan = tracer.startSpan('Validating user against bucket policy', undefined, tracerContext);
|
||||
bucketPolicyAuthSpan.setAttribute('code.function', 'standardMetadataValidateBucket()');
|
||||
bucketPolicyAuthSpan.setAttribute('code.filepath', 'lib/metadata/metadataUtils.js');
|
||||
bucketPolicyAuthSpan.setAttribute('code.lineno', 267);
|
||||
|
||||
const { bucketName } = params;
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieving metadata of bucket');
|
||||
return metadata.getBucket(bucketName, log, (err, bucket) => {
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: retrieved metadata of bucket');
|
||||
if (err) {
|
||||
// if some implicit actionImplicitDenies, return AccessDenied before
|
||||
// leaking any state information
|
||||
|
@ -270,9 +302,14 @@ function standardMetadataValidateBucket(params, actionImplicitDenies, log, callb
|
|||
log.debug('metadata getbucket failed', { error: err });
|
||||
return callback(err);
|
||||
}
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: validating user against bucket policy2');
|
||||
const validationError = validateBucket(bucket, params, log, actionImplicitDenies);
|
||||
bucketPolicyAuthSpan.addEvent('Cloudserver:: user validation done against bucket policy');
|
||||
return process.nextTick(() => {
|
||||
bucketPolicyAuthSpan.end();
|
||||
return callback(validationError, bucket);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
|
|
@ -3,6 +3,7 @@ const https = require('https');
|
|||
const cluster = require('cluster');
|
||||
const { series } = require('async');
|
||||
const arsenal = require('arsenal');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
const { RedisClient, StatsClient } = arsenal.metrics;
|
||||
const monitoringClient = require('./utilities/monitoringHandler');
|
||||
const metrics = require('./utilities/metrics');
|
||||
|
@ -89,6 +90,16 @@ class S3Server {
|
|||
* @returns {void}
|
||||
*/
|
||||
routeRequest(req, res) {
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
let activeSpan = opentelemetry.trace.getActiveSpan(opentelemetry.context.active());
|
||||
if (!activeSpan) {
|
||||
activeSpan = tracer.startSpan('routeRequest');
|
||||
}
|
||||
|
||||
activeSpan.addEvent('Cloudserver::routeRequest() Starting routing of request');
|
||||
activeSpan.setAttribute('code.function', 'routeRequest()');
|
||||
activeSpan.setAttribute('code.filepath', 'lib/server.js');
|
||||
activeSpan.setAttribute('code.lineno', 98);
|
||||
metrics.httpActiveRequests.inc();
|
||||
const requestStartTime = process.hrtime.bigint();
|
||||
|
||||
|
@ -140,7 +151,18 @@ class S3Server {
|
|||
vault,
|
||||
},
|
||||
};
|
||||
routes(req, res, params, logger, _config);
|
||||
// activeSpan.addEvent('I am going to route the request now');
|
||||
// activeSpan.setAttribute('s3.bucket_name', req.bucketName);
|
||||
// activeSpan.setAttribute('s3.object_key', req.objectKey);
|
||||
// activeSpan.updateName(`${req.method} ${req.bucketName}/${req.objectKey}`);
|
||||
// const v4Span = tracer.startSpan('verifySignatureV4', {
|
||||
// parent: activeSpan,
|
||||
// });
|
||||
routes(req, res, params, logger, _config, activeSpan, tracer);
|
||||
// return tracer.startActiveSpan('routeRequest', span => {
|
||||
// routes(req, res, params, logger, _config);
|
||||
// span.end();
|
||||
// });
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
const assert = require('assert');
|
||||
|
||||
const async = require('async');
|
||||
const opentelemetry = require('@opentelemetry/api');
|
||||
const { errors, s3middleware } = require('arsenal');
|
||||
|
||||
const ObjectMD = require('arsenal').models.ObjectMD;
|
||||
|
@ -308,12 +309,49 @@ const services = {
|
|||
* @param {function} cb - callback from async.waterfall in objectGet
|
||||
* @return {undefined}
|
||||
*/
|
||||
deleteObject(bucketName, objectMD, objectKey, options, deferLocationDeletion, log, cb) {
|
||||
deleteObject(bucketName, objectMD, objectKey, options, deferLocationDeletion, log, cb, tracerContext) {
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
|
||||
log.trace('deleting object from bucket');
|
||||
assert.strictEqual(typeof bucketName, 'string');
|
||||
assert.strictEqual(typeof objectMD, 'object');
|
||||
|
||||
function deleteMDandData() {
|
||||
if (tracerContext) {
|
||||
return tracer.startActiveSpan('Delete object metadata', undefined, tracerContext, deleteMdSpan => {
|
||||
return metadata.deleteObjectMD(bucketName, objectKey, options, log,
|
||||
(err, res) => {
|
||||
deleteMdSpan.addEvent('Delete object metadata');
|
||||
deleteMdSpan.end();
|
||||
if (err) {
|
||||
return cb(err, res);
|
||||
}
|
||||
log.trace('deleteObject: metadata delete OK');
|
||||
if (objectMD.location === null) {
|
||||
return cb(null, res);
|
||||
}
|
||||
|
||||
if (deferLocationDeletion) {
|
||||
return cb(null, Array.isArray(objectMD.location)
|
||||
? objectMD.location : [objectMD.location]);
|
||||
}
|
||||
|
||||
if (!Array.isArray(objectMD.location)) {
|
||||
data.delete(objectMD.location, log);
|
||||
return cb(null, res);
|
||||
}
|
||||
return tracer.startActiveSpan('Batch Deleting Data', undefined, tracerContext, deleteDataSpan => {
|
||||
return data.batchDelete(objectMD.location, null, null, log, err => {
|
||||
deleteDataSpan.end();
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return cb(null, res);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
return metadata.deleteObjectMD(bucketName, objectKey, options, log,
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
|
@ -333,7 +371,6 @@ const services = {
|
|||
data.delete(objectMD.location, log);
|
||||
return cb(null, res);
|
||||
}
|
||||
|
||||
return data.batchDelete(objectMD.location, null, null, log, err => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
|
@ -346,6 +383,19 @@ const services = {
|
|||
const objGetInfo = objectMD.location;
|
||||
// special case that prevents azure blocks from unecessary deletion
|
||||
// will return null if no need
|
||||
if (tracerContext) {
|
||||
return tracer.startActiveSpan('Checking if object is stored in Azure', undefined, tracerContext, objectDeleteSpan => {
|
||||
return data.protectAzureBlocks(bucketName, objectKey, objGetInfo,
|
||||
log, err => {
|
||||
objectDeleteSpan.addEvent('Checking if object is stored in Azure');
|
||||
objectDeleteSpan.end();
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
return deleteMDandData();
|
||||
});
|
||||
});
|
||||
}
|
||||
return data.protectAzureBlocks(bucketName, objectKey, objGetInfo,
|
||||
log, err => {
|
||||
if (err) {
|
||||
|
@ -365,12 +415,18 @@ const services = {
|
|||
* @return {undefined}
|
||||
* JSON response from metastore
|
||||
*/
|
||||
getObjectListing(bucketName, listingParams, log, cb) {
|
||||
getObjectListing(bucketName, listingParams, log, cb, tracerContext) {
|
||||
const tracer = opentelemetry.trace.getTracer(process.env.OTEL_SERVICE_NAME);
|
||||
const objectListSpan = tracer.startSpan('Listing objects from metadata', undefined, tracerContext);
|
||||
objectListSpan.setAttribute('code.function', 'getObjectListing()');
|
||||
objectListSpan.setAttribute('code.filepath', 'lib/services.js');
|
||||
objectListSpan.setAttribute('code.lineno', 374);
|
||||
assert.strictEqual(typeof bucketName, 'string');
|
||||
log.trace('performing metadata get object listing',
|
||||
{ listingParams });
|
||||
metadata.listObject(bucketName, listingParams, log,
|
||||
(err, listResponse) => {
|
||||
objectListSpan.end();
|
||||
if (err) {
|
||||
log.debug('error from metadata', { error: err });
|
||||
return cb(err);
|
||||
|
|
15
package.json
15
package.json
|
@ -20,7 +20,18 @@
|
|||
"homepage": "https://github.com/scality/S3#readme",
|
||||
"dependencies": {
|
||||
"@hapi/joi": "^17.1.0",
|
||||
"arsenal": "git+https://github.com/scality/arsenal#7.70.29",
|
||||
"@opentelemetry/api": "^1.8.0",
|
||||
"@opentelemetry/auto-instrumentations-node": "^0.46.1",
|
||||
"@opentelemetry/exporter-metrics-otlp-proto": "^0.52.0",
|
||||
"@opentelemetry/exporter-trace-otlp-proto": "^0.52.0",
|
||||
"@opentelemetry/instrumentation-ioredis": "^0.41.0",
|
||||
"@opentelemetry/resources": "^1.24.1",
|
||||
"@opentelemetry/sdk-metrics": "^1.24.1",
|
||||
"@opentelemetry/sdk-node": "^0.51.1",
|
||||
"@opentelemetry/sdk-trace-node": "^1.24.1",
|
||||
"@opentelemetry/sdk-trace-web": "^1.25.0",
|
||||
"@opentelemetry/semantic-conventions": "^1.24.1",
|
||||
"arsenal": "git+https://github.com/scality/Arsenal#1abae5844aaf6a8a9782a30faffda5205760e3b2",
|
||||
"async": "~2.5.0",
|
||||
"aws-sdk": "2.905.0",
|
||||
"azure-storage": "^2.1.0",
|
||||
|
@ -84,7 +95,7 @@
|
|||
"start": "npm-run-all --parallel start_dmd start_s3server",
|
||||
"start_mdserver": "node mdserver.js",
|
||||
"start_dataserver": "node dataserver.js",
|
||||
"start_s3server": "node index.js",
|
||||
"start_s3server": "node --require ./instrumentation.js index.js",
|
||||
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
||||
"start_utapi": "node lib/utapi/utapi.js",
|
||||
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
||||
|
|
Loading…
Reference in New Issue