Compare commits

..

1 Commits

Author SHA1 Message Date
scality-gelbart 772be08d1f
do not error 500 in case of negative metric 2022-11-04 15:57:51 +01:00
46 changed files with 5612 additions and 581 deletions

View File

@ -1,7 +1,8 @@
FROM ghcr.io/scality/vault:c2607856
FROM registry.scality.com/vault-dev/vault:c2607856
ENV VAULT_DB_BACKEND LEVELDB
RUN chmod 400 tests/utils/keyfile
ENTRYPOINT yarn start

View File

@ -2,13 +2,18 @@ name: build-ci-images
on:
workflow_call:
secrets:
REGISTRY_LOGIN:
required: true
REGISTRY_PASSWORD:
required: true
jobs:
warp10-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
uses: scality/workflows/.github/workflows/docker-build.yaml@v1
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
REGISTRY_PASSWORD: ${{ secrets.GITHUB_TOKEN }}
with:
name: warp10-ci
context: .
@ -16,22 +21,22 @@ jobs:
lfs: true
redis-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
uses: scality/workflows/.github/workflows/docker-build.yaml@v1
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
REGISTRY_PASSWORD: ${{ secrets.GITHUB_TOKEN }}
with:
name: redis-ci
context: .
file: images/redis/Dockerfile
redis-replica-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
uses: scality/workflows/.github/workflows/docker-build.yaml@v1
needs:
- redis-ci
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
REGISTRY_PASSWORD: ${{ secrets.GITHUB_TOKEN }}
with:
name: redis-replica-ci
context: .github/docker/redis-replica
@ -42,21 +47,28 @@ jobs:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2.3.4
with:
lfs: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@v1.6.0
- name: Login to GitHub Registry
uses: docker/login-action@v3
uses: docker/login-action@v1.10.0
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ github.token }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Login to Scality Registry
uses: docker/login-action@v1.10.0
with:
registry: registry.scality.com
username: ${{ secrets.REGISTRY_LOGIN }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build and push vault Image
uses: docker/build-push-action@v5
uses: docker/build-push-action@v2.7.0
with:
push: true
context: .github/docker/vault

View File

@ -7,10 +7,9 @@ on:
jobs:
build-dev:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
uses: scality/workflows/.github/workflows/docker-build.yaml@v1
secrets: inherit
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}
registry: registry.scality.com
namespace: utapi-dev
name: utapi

View File

@ -15,9 +15,11 @@ on:
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
uses: scality/workflows/.github/workflows/docker-build.yaml@v1
secrets: inherit
with:
registry: registry.scality.com
namespace: utapi
name: warp10
context: .
file: images/warp10/Dockerfile
@ -29,11 +31,11 @@ jobs:
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
- uses: softprops/action-gh-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release utapi/warp10:${{ github.event.inputs.tag }}-warp10
name: Release registry.scality.com/utapi/warp10:${{ github.event.inputs.tag }}-warp10
tag_name: ${{ github.event.inputs.tag }}-warp10
generate_release_notes: false
target_commitish: ${{ github.sha }}

View File

@ -22,10 +22,12 @@ on:
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
uses: scality/workflows/.github/workflows/docker-build.yaml@v1
secrets: inherit
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}
registry: registry.scality.com
namespace: utapi
name: utapi
context: .
file: ${{ github.event.inputs.dockerfile}}
tag: ${{ github.event.inputs.tag }}
@ -35,9 +37,9 @@ jobs:
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
- uses: softprops/action-gh-release@v1
env:
GITHUB_TOKEN: ${{ github.token }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release ${{ github.event.inputs.tag }}
tag_name: ${{ github.event.inputs.tag }}

View File

@ -4,32 +4,23 @@ name: tests
on:
push:
branches-ignore:
- 'development/**'
- 'development/**'
workflow_dispatch:
inputs:
debug:
description: Debug (enable the ability to SSH to runners)
type: boolean
required: false
default: 'false'
connection-timeout-m:
type: number
required: false
description: Timeout for ssh connection to worker (minutes)
default: 30
jobs:
build-ci:
uses: ./.github/workflows/build-ci.yaml
secrets:
REGISTRY_LOGIN: ${{ secrets.REGISTRY_LOGIN }}
REGISTRY_PASSWORD: ${{ secrets.REGISTRY_PASSWORD }}
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
@ -88,7 +79,7 @@ jobs:
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
image: bitnami/redis-sentinel:6.2
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
@ -119,19 +110,24 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
- uses: actions/setup-python@v2
with:
python-version: '3.9'
cache: pip
- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip
- name: Install python deps
run: pip install -r requirements.txt
run: |
pip install requests
pip install redis
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: ${{ matrix.test.name }}
@ -165,7 +161,7 @@ jobs:
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
image: bitnami/redis-sentinel:6.2
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
@ -208,19 +204,24 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
- uses: actions/setup-python@v2
with:
python-version: '3.9'
cache: pip
- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip
- name: Install python deps
run: pip install -r requirements.txt
run: |
pip install requests
pip install redis
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 for 60 seconds
@ -232,16 +233,9 @@ jobs:
UTAPI_SERVICE_USER_ENABLED: 'true'
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: failure()
tests-v2-without-sensision:
needs:
@ -287,7 +281,7 @@ jobs:
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
image: bitnami/redis-sentinel:6.2
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
@ -329,19 +323,24 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
- uses: actions/setup-python@v2
with:
python-version: '3.9'
cache: pip
- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip
- name: Install python deps
run: pip install -r requirements.txt
run: |
pip install requests
pip install redis
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 a little bit
@ -349,13 +348,6 @@ jobs:
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: failure()

View File

@ -27,7 +27,7 @@ x-models:
services:
redis-0:
image: redis:7.2.4
image: redis:6
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6379:6379
@ -35,7 +35,7 @@ services:
- HOST_IP="${EXTERNAL_HOST}"
redis-1:
image: redis:7.2.4
image: redis:6
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6380:6380
@ -43,7 +43,7 @@ services:
- HOST_IP="${EXTERNAL_HOST}"
redis-sentinel-0:
image: redis:7.2.4
image: redis:6
command: |-
bash -c 'cat > /tmp/sentinel.conf <<EOF
port 16379

View File

@ -2,10 +2,11 @@
## Docker Image Generation
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
Utapi has one namespace there:
Docker images are hosted on [registry.scality.com](registry.scality.com).
Utapi has two namespaces there:
* Namespace: ghcr.io/scality/utapi
* Production Namespace: registry.scality.com/utapi
* Dev Namespace: registry.scality.com/utapi-dev
With every CI build, the CI will push images, tagging the
content with the developer branch's short SHA-1 commit hash.
@ -17,8 +18,8 @@ Tagged versions of utapi will be stored in the production namespace.
## How to Pull Docker Images
```sh
docker pull ghcr.io/scality/utapi:<commit hash>
docker pull ghcr.io/scality/utapi:<tag>
docker pull registry.scality.com/utapi-dev/utapi:<commit hash>
docker pull registry.scality.com/utapi/utapi:<tag>
```
## Release Process

View File

@ -1,4 +1,4 @@
FROM ghcr.io/scality/federation/nodesvc-base:7.10.5.0
FROM registry.scality.com/federation/nodesvc-base:7.10.5.0
ENV UTAPI_CONFIG_FILE=${CONF_DIR}/config.json

1
images/warp10/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
*.jar filter=lfs diff=lfs merge=lfs -text

View File

@ -13,7 +13,7 @@ RUN apk add zip unzip build-base \
&& cd .. \
&& go build -a -o /usr/local/go/warp10_sensision_exporter
FROM ghcr.io/scality/utapi/warp10:2.8.1-95-g73e7de80
FROM registry.scality.com/utapi/warp10:2.8.1-95-g73e7de80
# Override baked in version
# Remove when updating to a numbered release

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:389d2135867c399a389901460c5f2cc09f4857d0c6d08632c2638c25fb150c46
size 15468553

View File

@ -1,13 +1,35 @@
/* eslint-disable no-bitwise */
const assert = require('assert');
const fs = require('fs');
const path = require('path');
/**
* Reads from a config file and returns the content as a config object
*/
class Config {
constructor(config) {
this.component = config.component;
constructor() {
/*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
this._basePath = path.resolve(__dirname, '..');
this.path = `${this._basePath}/config.json`;
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
this.path = process.env.UTAPI_CONFIG_FILE;
}
// Read config automatically
this._getConfig();
}
_getConfig() {
let config;
try {
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
config = JSON.parse(data);
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
this.port = 9500;
if (config.port !== undefined) {
@ -93,26 +115,18 @@ class Config {
}
}
if (config.vaultclient) {
// Instance passed from outside
this.vaultclient = config.vaultclient;
this.vaultd = null;
} else {
// Connection data
this.vaultclient = null;
this.vaultd = {};
if (config.vaultd) {
if (config.vaultd.port !== undefined) {
assert(Number.isInteger(config.vaultd.port)
&& config.vaultd.port > 0,
'bad config: vaultd port must be a positive integer');
this.vaultd.port = config.vaultd.port;
}
if (config.vaultd.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.vaultd.host = config.vaultd.host;
}
this.vaultd = {};
if (config.vaultd) {
if (config.vaultd.port !== undefined) {
assert(Number.isInteger(config.vaultd.port)
&& config.vaultd.port > 0,
'bad config: vaultd port must be a positive integer');
this.vaultd.port = config.vaultd.port;
}
if (config.vaultd.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.vaultd.host = config.vaultd.host;
}
}
@ -127,11 +141,12 @@ class Config {
const { key, cert, ca } = config.certFilePaths
? config.certFilePaths : {};
if (key && cert) {
const keypath = key;
const certpath = cert;
const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
const certpath = (cert[0] === '/')
? cert : `${this._basePath}/${cert}`;
let capath;
if (ca) {
capath = ca;
capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`);
}
@ -157,13 +172,8 @@ class Config {
+ 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics;
}
if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}
return config;
}
}
module.exports = Config;
module.exports = new Config();

View File

@ -6,6 +6,8 @@ const async = require('async');
const { errors } = require('arsenal');
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
const s3metricResponseJSON = require('../models/s3metricResponse');
const config = require('./Config');
const Vault = require('./Vault');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
@ -21,6 +23,7 @@ class ListMetrics {
constructor(metric, component) {
this.metric = metric;
this.service = component;
this.vault = new Vault(config);
}
/**
@ -80,10 +83,9 @@ class ListMetrics {
const resources = validator.get(this.metric);
const timeRange = validator.get('timeRange');
const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
@ -122,11 +124,10 @@ class ListMetrics {
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
const timeRange = [start - fifteenMinutes, end];
const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}

View File

@ -16,19 +16,15 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex {
constructor(config) {
this._enabled = false;
this._schedule = REINDEX_SCHEDULE;
this._redis = {
this._sentinel = {
host: '127.0.0.1',
port: 16379,
name: 'scality-s3',
sentinelPassword: '',
sentinels: [{
host: '127.0.0.1',
port: 16379,
}],
};
this._bucketd = {
host: '127.0.0.1',
@ -46,13 +42,14 @@ class UtapiReindex {
if (config && config.password) {
this._password = config.password;
}
if (config && config.redis) {
if (config && config.sentinel) {
const {
name, sentinelPassword, sentinels,
} = config.redis;
this._redis.name = name || this._redis.name;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
this._redis.sentinels = sentinels || this._redis.sentinels;
host, port, name, sentinelPassword,
} = config.sentinel;
this._sentinel.host = host || this._sentinel.host;
this._sentinel.port = port || this._sentinel.port;
this._sentinel.name = name || this._sentinel.name;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
}
if (config && config.bucketd) {
const { host, port } = config.bucketd;
@ -64,16 +61,17 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}
this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);
this._requestLogger = this._log.newRequestLogger();
}
_getRedisClient() {
const client = new RedisClient({
sentinels: this._redis.sentinels,
name: this._redis.name,
sentinelPassword: this._redis.sentinelPassword,
sentinels: [{
host: this._sentinel.host,
port: this._sentinel.port,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password,
});
client.connect();
@ -88,18 +86,17 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY);
}
_buildFlags(sentinel) {
_buildFlags() {
const flags = {
/* eslint-disable camelcase */
sentinel_ip: sentinel.host,
sentinel_port: sentinel.port,
sentinel_cluster_name: this._redis.name,
sentinel_ip: this._sentinel.host,
sentinel_port: this._sentinel.port,
sentinel_cluster_name: this._sentinel.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
};
if (this._redis.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword;
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
}
/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
@ -108,15 +105,11 @@ class UtapiReindex {
opts.push(name);
opts.push(flags[flag]);
});
if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts;
}
_runScriptWithSentinels(path, remainingSentinels, done) {
const flags = this._buildFlags(remainingSentinels.shift());
_runScript(path, done) {
const flags = this._buildFlags();
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
process.stdout.on('data', data => {
@ -143,17 +136,6 @@ class UtapiReindex {
statusCode: code,
script: path,
});
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else {
this._requestLogger.info('script exited successfully', {
statusCode: code,
@ -164,11 +146,6 @@ class UtapiReindex {
});
}
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock()

View File

@ -14,15 +14,6 @@ class UtapiRequest {
this._datastore = null;
this._requestQuery = null;
this._requestPath = null;
this._vault = null;
}
getVault() {
return this._vault;
}
setVault() {
return this._vault;
}
/**

View File

@ -1,21 +1,16 @@
import argparse
import ast
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests
import redis
import json
import ast
import sys
from threading import Thread
import time
import urllib
import re
import sys
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex:reporting')
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
import argparse
def get_options():
parser = argparse.ArgumentParser()
@ -34,19 +29,8 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password
r = redis.Redis(
host=ip,
port=port,
db=0,
password=password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
r = redis.Redis(host=ip, port=port, db=0, password=password)
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)

View File

@ -1,6 +1,5 @@
import argparse
import concurrent.futures as futures
import functools
import itertools
import json
import logging
@ -9,9 +8,9 @@ import re
import sys
import time
import urllib
from pathlib import Path
from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from pprint import pprint
import redis
import requests
@ -25,9 +24,6 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -36,38 +32,9 @@ def get_options():
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]
return options
def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path
return parser.parse_args()
def chunks(iterable, size):
it = iter(iterable)
@ -82,7 +49,7 @@ def _encoded(func):
return urllib.parse.quote(val.encode('utf-8'))
return inner
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'])
Bucket = namedtuple('Bucket', ['userid', 'name'])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])
@ -94,21 +61,15 @@ class InvalidListing(Exception):
def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket)
class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)
class BucketDClient:
'''Performs Listing calls against bucketd'''
__url_attribute_format = '{addr}/default/attributes/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__url_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
def __init__(self, bucketd_addr=None, max_retries=2):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()
def _do_req(self, url, check_500=True, **kwargs):
@ -140,7 +101,7 @@ class BucketDClient:
parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded.
'''
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop
@ -153,9 +114,6 @@ class BucketDClient:
_log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200:
payload = resp.json()
except ValueError as e:
@ -177,37 +135,7 @@ class BucketDClient:
else:
is_truncated = len(payload) > 0
@functools.lru_cache(maxsize=16)
def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise BucketNotFound(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
raise
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise
def get_bucket_md(self, name):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))
def list_buckets(self, account=None):
def list_buckets(self, name = None):
def get_next_marker(p):
if p is None:
@ -219,24 +147,19 @@ class BucketDClient:
'maxKeys': 1000,
'marker': get_next_marker
}
if account is not None:
params['prefix'] = '%s..|..' % account
for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload.get('Contents', []):
for result in payload['Contents']:
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False)
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)
bucket = Bucket(*match.groups())
if name is None or bucket.name == name:
buckets.append(bucket)
if buckets:
yield buckets
if name is not None:
# Break on the first matching bucket if a name is given
break
def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
@ -273,12 +196,18 @@ class BucketDClient:
upload_id=key['value']['UploadId']))
return keys
def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
def _sum_objects(self, bucket, listing):
count = 0
total_size = 0
last_key = None
try:
for obj in listing:
last_master = None
last_size = None
for status_code, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
if contents is None:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code))
raise InvalidListing(bucket)
for obj in contents:
count += 1
if isinstance(obj['value'], dict):
# bucketd v6 returns a dict:
data = obj.get('value', {})
@ -287,51 +216,39 @@ class BucketDClient:
# bucketd v7 returns an encoded string
data = json.loads(obj['value'])
size = data.get('content-length', 0)
is_latest = obj['key'] != last_key
last_key = obj['key']
if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue
count += 1
total_size += size
except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
return count, total_size
# If versioned, subtract the size of the master to avoid double counting
if last_master is not None and obj['key'].startswith(last_master + '\x00'):
_log.debug('Detected versioned key: %s - subtracting master size: %i'% (
obj['key'],
last_size,
))
total_size -= last_size
count -= 1
last_master = None
def _extract_listing(self, key, listing):
for status_code, payload in listing:
contents = payload[key] if isinstance(payload, dict) else payload
if contents is None:
raise InvalidListing('')
for obj in contents:
yield obj
# Only save master versions
elif '\x00' not in obj['key']:
last_master = obj['key']
last_size = size
return count, total_size
def count_bucket_contents(self, bucket):
def get_key_marker(p):
if p is None:
def get_next_marker(p):
if p is None or len(p) == 0:
return ''
return p.get('NextKeyMarker', '')
def get_vid_marker(p):
if p is None:
return ''
return p.get('NextVersionIdMarker', '')
return p[-1].get('key', '')
params = {
'listingType': 'DelimiterVersions',
'listingType': 'Basic',
'maxKeys': 1000,
'keyMarker': get_key_marker,
'versionIdMarker': get_vid_marker,
'gt': get_next_marker,
}
listing = self._list_bucket(bucket.name, **params)
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params))
return BucketContents(
bucket=bucket,
obj_count=count,
@ -339,8 +256,7 @@ class BucketDClient:
)
def count_mpu_parts(self, mpu):
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
def get_prefix(p):
if p is None:
@ -360,31 +276,13 @@ class BucketDClient:
'listingType': 'Delimiter',
}
listing = self._list_bucket(shadow_bucket_name, **params)
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params))
return BucketContents(
bucket=shadow_bucket,
bucket=mpu.bucket._replace(name=_bucket),
obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size
)
def list_all_buckets(bucket_client):
return bucket_client.list_buckets()
def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)
def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue
yield batch
def index_bucket(client, bucket):
'''
@ -424,16 +322,9 @@ def get_redis_client(options):
host=options.sentinel_ip,
port=options.sentinel_port,
db=0,
password=options.redis_password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
password=options.redis_password
)
try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
return redis.Redis(
host=ip,
port=port,
@ -467,24 +358,16 @@ def log_report(resource, name, obj_count, total_size):
if __name__ == '__main__':
options = get_options()
if options.debug:
_log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
failed_accounts = set()
if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)
with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in batch_generator:
for batch in bucket_client.list_buckets(options.bucket):
bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
for job in futures.as_completed(jobs.keys()):
@ -503,84 +386,51 @@ if __name__ == '__main__':
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
# Bucket reports can be updated as we get them
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
stale_buckets = recorded_buckets.difference(observed_buckets)
_log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket is None:
stale_buckets = recorded_buckets.difference(observed_buckets)
elif observed_buckets and options.bucket not in recorded_buckets:
# The provided bucket does not exist, so clean up any metrics
stale_buckets = { options.bucket }
else:
stale_buckets = set()
_log.info('Found %s stale buckets' % len(stale_buckets))
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket is None:
# Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run:
for userid, report in account_reports.items():
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
else:
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
if options.account:
stale_accounts = { a for a in options.account if a not in observed_accounts }
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
_log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run:
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()

View File

@ -7,6 +7,7 @@ const { Clustering, errors, ipCheck } = require('arsenal');
const arsenalHttps = require('arsenal').https;
const { Logger } = require('werelogs');
const config = require('./Config');
const routes = require('../router/routes');
const Route = require('../router/Route');
const Router = require('../router/Router');
@ -27,12 +28,7 @@ class UtapiServer {
constructor(worker, port, datastore, logger, config) {
this.worker = worker;
this.port = port;
this.vault = config.vaultclient;
if (!this.vault) {
const Vault = require('./Vault');
this.vault = new Vault(config);
}
this.router = new Router(config, this.vault);
this.router = new Router(config);
this.logger = logger;
this.datastore = datastore;
this.server = null;
@ -75,7 +71,6 @@ class UtapiServer {
req.socket.setNoDelay();
const { query, path, pathname } = url.parse(req.url, true);
const utapiRequest = new UtapiRequest()
.setVault(this.vault)
.setRequest(req)
.setLog(this.logger.newRequestLogger())
.setResponse(res)
@ -219,7 +214,8 @@ class UtapiServer {
* @property {object} params.log - logger configuration
* @return {undefined}
*/
function spawn(config) {
function spawn(params) {
Object.assign(config, params);
const {
workers, redis, log, port,
} = config;

View File

@ -23,6 +23,10 @@
"healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"]
},
"vaultd": {
"host": "127.0.0.1",
"port": 8500
},
"cacheBackend": "memory",
"development": false,
"nodeId": "single_node",

View File

@ -2,8 +2,6 @@ const fs = require('fs');
const path = require('path');
const Joi = require('@hapi/joi');
const assert = require('assert');
const defaults = require('./defaults.json');
const werelogs = require('werelogs');
const {
truthy, envNamespace, allowedFilterFields, allowedFilterStates,
@ -73,6 +71,7 @@ class Config {
constructor(overrides) {
this._basePath = path.join(__dirname, '../../');
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
this._defaultsPath = path.join(__dirname, 'defaults.json');
this.host = undefined;
this.port = undefined;
@ -90,11 +89,6 @@ class Config {
parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
}
Object.assign(this, parsedConfig);
werelogs.configure({
level: Config.logging.level,
dump: Config.logging.dumpLevel,
});
}
static _readFile(path, encoding = 'utf-8') {
@ -119,7 +113,7 @@ class Config {
}
_loadDefaults() {
return defaults;
return Config._readJSON(this._defaultsPath);
}
_loadUserConfig() {
@ -198,10 +192,6 @@ class Config {
`${prefix}_SENTINEL_PASSWORD`,
config.sentinelPassword,
);
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
config.password,
);
} else {
redisConf.host = _loadFromEnv(
`${prefix}_HOST`,

View File

@ -6,8 +6,7 @@ const BackOff = require('backo');
const { whilst } = require('async');
const errors = require('./errors');
const { LoggerContext } = require('./utils/log');
const { asyncOrCallback } = require('./utils/func');
const { LoggerContext, asyncOrCallback } = require('./utils');
const moduleLogger = new LoggerContext({
module: 'redis',

View File

@ -1,13 +1,13 @@
const promClient = require('prom-client');
const httpRequestsTotal = new promClient.Counter({
name: 's3_utapi_http_requests_total',
name: 'utapi_http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['action', 'code'],
});
const httpRequestDurationSeconds = new promClient.Histogram({
name: 's3_utapi_http_request_duration_seconds',
name: 'utapi_http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['action', 'code'],
// buckets for response time from 0.1ms to 60s

View File

@ -68,20 +68,20 @@ class BaseTask extends Process {
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
const executionDuration = new promClient.Gauge({
name: `s3_utapi${taskNameSnake}_duration_seconds`,
name: `utapi${taskNameSnake}_duration_seconds`,
help: `Execution time of the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionAttempts = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_attempts_total`,
help: `Total number of attempts to execute the ${taskName} task`,
name: `utapi${taskNameSnake}_attempts_total`,
help: `Number of attempts to execute the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionFailures = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_failures_total`,
help: `Total number of failures executing the ${taskName} task`,
name: `utapi${taskNameSnake}_failures_total`,
help: `Number of failures executing the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});

View File

@ -24,14 +24,14 @@ class CreateCheckpoint extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_checkpoint_created_total',
help: 'Total number of checkpoints created',
name: 'utapi_create_checkpoint_created_total',
help: 'Number of checkpoints created',
labelNames: ['origin', 'containerName'],
});
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
const lastCheckpoint = new promClient.Gauge({
name: 's3_utapi_create_checkpoint_last_checkpoint_seconds',
name: 'utapi_create_checkpoint_last_checkpoint_seconds',
help: 'Timestamp of the last successfully created checkpoint',
labelNames: ['origin', 'containerName'],
async collect() {

View File

@ -24,14 +24,14 @@ class CreateSnapshot extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_snapshot_created_total',
help: 'Total number of snapshots created',
name: 'utapi_create_snapshot_created_total',
help: 'Number of snapshots created',
labelNames: ['origin', 'containerName'],
});
const getLastSnapshot = this._getLastSnapshot.bind(this);
const lastSnapshot = new promClient.Gauge({
name: 's3_utapi_create_snapshot_last_snapshot_seconds',
name: 'utapi_create_snapshot_last_snapshot_seconds',
help: 'Timestamp of the last successfully created snapshot',
labelNames: ['origin', 'containerName'],
async collect() {

View File

@ -52,31 +52,31 @@ class MonitorDiskUsage extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const isLocked = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_is_locked',
name: 'utapi_monitor_disk_usage_is_locked',
help: 'Indicates whether the monitored warp 10 has had writes disabled',
labelNames: ['origin', 'containerName'],
});
const leveldbBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_leveldb_bytes',
name: 'utapi_monitor_disk_usage_leveldb_bytes',
help: 'Total bytes used by warp 10 leveldb',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_datalog_bytes',
name: 'utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10 datalog',
labelNames: ['origin', 'containerName'],
});
const hardLimitRatio = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_ratio',
name: 'utapi_monitor_disk_usage_hard_limit_ratio',
help: 'Percent of the hard limit used by warp 10',
labelNames: ['origin', 'containerName'],
});
const hardLimitSetting = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_bytes',
name: 'utapi_monitor_disk_usage_hard_limit_bytes',
help: 'The hard limit setting in bytes',
labelNames: ['origin', 'containerName'],
});

View File

@ -32,26 +32,26 @@ class IngestShardTask extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const ingestedTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_ingest_total',
help: 'Total number of metrics ingested',
name: 'utapi_ingest_shard_task_ingest_total',
help: 'Number of metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedSlow = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_slow_total',
help: 'Total number of slow metrics ingested',
name: 'utapi_ingest_shard_task_slow_total',
help: 'Number of slow metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedShards = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_ingest_total',
help: 'Total number of metric shards ingested',
name: 'utapi_ingest_shard_task_shard_ingest_total',
help: 'Number of metric shards ingested',
labelNames: ['origin', 'containerName'],
});
const shardAgeTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_age_total',
help: 'Total aggregated age of shards',
name: 'utapi_ingest_shard_task_shard_age_total',
help: 'Aggregated age of shards',
labelNames: ['origin', 'containerName'],
});

View File

@ -24,8 +24,8 @@ class RepairTask extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_repair_task_created_total',
help: 'Total number of repair records created',
name: 'utapi_repair_task_created_total',
help: 'Number of repair records created',
labelNames: ['origin', 'containerName'],
});

View File

@ -1,6 +1,14 @@
const werelogs = require('werelogs');
const config = require('../config');
const { comprehend } = require('./func');
const loggerConfig = {
level: config.logging.level,
dump: config.logging.dumpLevel,
};
werelogs.configure(loggerConfig);
const rootLogger = new werelogs.Logger('Utapi');
class LoggerContext {
@ -70,6 +78,8 @@ class LoggerContext {
}
}
rootLogger.debug('logger initialized', { loggerConfig });
function buildRequestLogger(req) {
let reqUids = [];
if (req.headers['x-scal-request-uids'] !== undefined) {

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { auth, policies } = require('arsenal');
const vaultclient = require('vaultclient');
const config = require('../config');
const errors = require('../errors');
/**
@ -8,17 +9,9 @@ const errors = require('../errors');
*/
class VaultWrapper extends auth.Vault {
create(config) {
if (config.vaultd.host) {
return new VaultWrapper(config);
}
return null;
}
constructor(options) {
let client;
const { host, port } = options.vaultd;
const vaultclient = require('vaultclient');
if (options.tls) {
const { key, cert, ca } = options.tls;
client = new vaultclient.Client(host, port, true, key, cert,
@ -126,7 +119,7 @@ class VaultWrapper extends auth.Vault {
}
}
const vault = VaultWrapper.create(config);
const vault = new VaultWrapper(config);
auth.setHandler(vault);
module.exports = {

View File

@ -3,7 +3,7 @@
"engines": {
"node": ">=16"
},
"version": "8.1.15",
"version": "8.1.9",
"description": "API for tracking resource utilization and reporting metrics",
"main": "index.js",
"repository": {
@ -19,12 +19,13 @@
"dependencies": {
"@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.14",
"arsenal": "git+https://git.yourcmc.ru/vitalif/zenko-arsenal.git#development/8.1",
"arsenal": "git+https://github.com/scality/Arsenal#8.1.64",
"async": "^3.2.0",
"aws-sdk": "^2.1005.0",
"aws4": "^1.8.0",
"backo": "^1.1.0",
"body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient#8.1.5",
"byte-size": "^7.0.0",
"commander": "^5.1.0",
"cron-parser": "^2.15.0",
@ -37,16 +38,17 @@
"needle": "^2.5.0",
"node-schedule": "^1.3.2",
"oas-tools": "^2.2.2",
"prom-client": "14.2.0",
"prom-client": "^13.1.0",
"uuid": "^3.3.2",
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1"
"vaultclient": "scality/vaultclient#8.2.6",
"werelogs": "scality/werelogs#8.1.0"
},
"devDependencies": {
"eslint": "^8.14.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
"eslint": "6.0.1",
"eslint-config-airbnb": "17.1.0",
"eslint-config-scality": "scality/Guidelines#8.2.0",
"eslint-plugin-import": "^2.18.0",
"mocha": ">=3.1.2",
"mocha": "^3.0.2",
"nodemon": "^2.0.4",
"protobufjs": "^6.10.1",
"sinon": "^9.0.2"

View File

@ -1,2 +0,0 @@
redis==5.0.3
requests==2.31.0

View File

@ -3,16 +3,17 @@ const assert = require('assert');
const url = require('url');
const { auth, errors, policies } = require('arsenal');
const safeJsonParse = require('../utils/safeJsonParse');
const Vault = require('../lib/Vault');
class Router {
/**
* @constructor
* @param {Config} config - Config instance
*/
constructor(config, vault) {
constructor(config) {
this._service = config.component;
this._routes = {};
this._vault = vault;
this._vault = new Vault(config);
}
/**

View File

@ -1,21 +1,4 @@
const fs = require('fs');
const path = require('path');
const Config = require('./lib/Config');
const config = require('./lib/Config');
const server = require('./lib/server');
/*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
const cfgpath = process.env.UTAPI_CONFIG_FILE || (__dirname+'/config.json');
let cfg;
try {
cfg = JSON.parse(fs.readFileSync(cfgpath, { encoding: 'utf-8' }));
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
cfg.component = 's3';
server(new Config(cfg));
server(Object.assign({}, config, { component: 's3' }));

View File

@ -41,7 +41,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
assert(lockSpy.calledOnce);
assert(unlockSpy.notCalled);
assert(execStub.calledOnce);
await assertMetricValue('s3_utapi_monitor_disk_usage_hard_limit_bytes', 1);
await assertMetricValue('utapi_monitor_disk_usage_hard_limit_bytes', 1);
});
it('should trigger a database unlock if below the limit', async () => {

View File

@ -15,7 +15,7 @@ class CustomTask extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const foo = new promClient.Gauge({
name: 's3_utapi_custom_task_foo_total',
name: 'utapi_custom_task_foo_total',
help: 'Count of foos',
labelNames: ['origin', 'containerName'],
});
@ -58,26 +58,26 @@ describe('Test BaseTask metrics', () => {
it('should push metrics for a task execution', async () => {
await task.execute();
const timeValues = await getMetricValues('s3_utapi_custom_task_duration_seconds');
const timeValues = await getMetricValues('utapi_custom_task_duration_seconds');
assert.strictEqual(timeValues.length, 1);
const attemptsValues = await getMetricValues('s3_utapi_custom_task_attempts_total');
const attemptsValues = await getMetricValues('utapi_custom_task_attempts_total');
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, []);
});
it('should push metrics for a failed task execution', async () => {
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
await task.execute();
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
});
it('should allow custom handlers to be registered', async () => {
await task.execute();
const fooValues = await getMetricValues('s3_utapi_custom_task_foo_total');
const fooValues = await getMetricValues('utapi_custom_task_foo_total');
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
});
});

View File

@ -80,7 +80,7 @@ describe('Test CreateCheckpoint', function () {
assert.strictEqual(series.length, 3);
assertResults(totals, series);
await assertMetricValue('s3_utapi_create_checkpoint_created_total', series.length);
await assertMetricValue('utapi_create_checkpoint_created_total', series.length);
});
it('should only include events not in an existing checkpoint', async () => {

View File

@ -88,7 +88,7 @@ describe('Test CreateSnapshot', function () {
assert.strictEqual(series.length, 3);
assertResults(totals, series);
await assertMetricValue('s3_utapi_create_snapshot_created_total', series.length);
await assertMetricValue('utapi_create_snapshot_created_total', series.length);
});
it('should create a snapshot from more than one checkpoint', async () => {

View File

@ -69,8 +69,8 @@ describe('Test MonitorDiskUsage', () => {
const expectedTotalSize = expectedSingleSize * 2;
assert.strictEqual(task.usage, expectedTotalSize);
// Should equal the usage minus the empty datalog
await assertMetricValue('s3_utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
await assertMetricValue('s3_utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
await assertMetricValue('utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
await assertMetricValue('utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
});
});

View File

@ -92,9 +92,9 @@ describe('Test IngestShards', function () {
'@utapi/decodeEvent',
);
assertResults(events, series);
await assertMetricValue('s3_utapi_ingest_shard_task_ingest_total', events.length);
await assertMetricValue('s3_utapi_ingest_shard_task_shard_ingest_total', 1);
const metricValues = await getMetricValues('s3_utapi_ingest_shard_task_shard_age_total');
await assertMetricValue('utapi_ingest_shard_task_ingest_total', events.length);
await assertMetricValue('utapi_ingest_shard_task_shard_ingest_total', 1);
const metricValues = await getMetricValues('utapi_ingest_shard_task_shard_age_total');
assert.strictEqual(metricValues.length, 1);
const [metric] = metricValues;
assert(metric.value > 0);
@ -118,7 +118,7 @@ describe('Test IngestShards', function () {
);
assertResults(events, series);
await assertMetricValue('s3_utapi_ingest_shard_task_ingest_total', events.length);
await assertMetricValue('utapi_ingest_shard_task_ingest_total', events.length);
});
it('should ingest old metrics as repair', async () => {
@ -138,7 +138,7 @@ describe('Test IngestShards', function () {
'@utapi/decodeEvent',
);
assertResults(events, series);
await assertMetricValue('s3_utapi_ingest_shard_task_slow_total', events.length);
await assertMetricValue('utapi_ingest_shard_task_slow_total', events.length);
});
it('should strip the event uuid during ingestion', async () => {

View File

@ -80,7 +80,7 @@ describe('Test Repair', function () {
assert.strictEqual(series.length, 3);
assertResults(totals, series);
await assertMetricValue('s3_utapi_repair_task_created_total', series.length);
await assertMetricValue('utapi_repair_task_created_total', series.length);
});
it('should only include events not in an existing correction', async () => {

View File

@ -39,7 +39,7 @@ function getMetricResponse(schemaKey) {
return response;
}
function assertMetrics(schemaKey, metricName, props, done) {
function assertMetrics(schemaKey, metricName, props, isNegativeValue, done) {
const timestamp = new Date().setMinutes(0, 0, 0);
const timeRange = [timestamp, timestamp];
const expectedRes = getMetricResponse(schemaKey);
@ -51,6 +51,16 @@ function assertMetrics(schemaKey, metricName, props, done) {
datastore,
logger,
(err, res) => {
if (isNegativeValue) {
assert(err.is.InternalError);
assert.strictEqual(
err.description,
'Utapi is in a transient state for this time period as '
+ 'metrics are being collected. Please try again in a few '
+ 'minutes.',
);
return done();
}
assert.strictEqual(err, null);
// overwrite operations metrics
if (expectedResProps.operations) {
@ -88,12 +98,13 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
if (keyIndex === 'storageUtilized' || keyIndex === 'numberOfObjects') {
key = generateStateKey(schemaObject, keyIndex);
val = isNegativeValue ? -1024 : 1024;
props[metricindex] = isNegativeValue ? [0, 0] : [val, val];
props[metricindex] = [val, val];
memBackend.zadd(key, timestamp, val, () =>
assertMetrics(
schemaKey,
schemaObject[schemaKey],
props,
isNegativeValue,
done,
));
} else if (keyIndex === 'incomingBytes' || keyIndex === 'outgoingBytes') {
@ -105,6 +116,7 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
schemaKey,
schemaObject[schemaKey],
props,
isNegativeValue,
done,
));
} else {
@ -117,6 +129,7 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
schemaKey,
schemaObject[schemaKey],
props,
isNegativeValue,
done,
));
}
@ -132,6 +145,7 @@ Object.keys(metricLevels).forEach(schemaKey => {
schemaKey,
resourceNames[schemaKey],
null,
false,
done,
));

View File

@ -114,21 +114,20 @@ describe('Test middleware', () => {
req.ctx = new RequestContext(req);
middleware.httpMetricsMiddleware(req, resp);
await assertMetricValue('s3_utapi_http_requests_total', 1);
const durationMetric = 's3_utapi_http_request_duration_seconds';
const duration = await getMetricValues(durationMetric);
await assertMetricValue('utapi_http_requests_total', 1);
const duration = await getMetricValues('utapi_http_request_duration_seconds');
// 14 defined buckets + 1 for Infinity
assert.strictEqual(
duration.filter(metric => metric.metricName === `${durationMetric}_bucket`).length,
duration.filter(metric => metric.metricName === 'utapi_http_request_duration_seconds_bucket').length,
15,
);
const count = duration.filter(metric => metric.metricName === `${durationMetric}_count`);
const count = duration.filter(metric => metric.metricName === 'utapi_http_request_duration_seconds_count');
assert.deepStrictEqual(count, [{
labels: {
action: 'listMetrics',
code: 200,
},
metricName: `${durationMetric}_count`,
metricName: 'utapi_http_request_duration_seconds_count',
value: 1,
}]);
assert.strictEqual(count[0].value, 1);
@ -138,7 +137,7 @@ describe('Test middleware', () => {
const req = templateRequest();
req.ctx = new RequestContext(req);
middleware.httpMetricsMiddleware(req, resp);
assert.rejects(() => getMetricValues('s3_utapi_http_requests_total'));
assert.rejects(() => getMetricValues('utapi_http_requests_total'));
});
});
});

View File

@ -112,17 +112,6 @@ class BucketD {
return body;
}
_getBucketVersionResponse(bucketName) {
const body = {
CommonPrefixes: [],
IsTruncated: false,
Versions: (this._bucketContent[bucketName] || [])
// patch in a versionId to more closely match the real response
.map(entry => ({ ...entry, versionId: 'null' })),
};
return body;
}
_getShadowBucketOverviewResponse(bucketName) {
const mpus = (this._bucketContent[bucketName] || []).map(o => ({
key: o.key,
@ -148,8 +137,6 @@ class BucketD {
|| req.query.listingType === 'Delimiter'
) {
req.body = this._getBucketResponse(bucketName);
} else if (req.query.listingType === 'DelimiterVersions') {
req.body = this._getBucketVersionResponse(bucketName);
}
// v2 reindex uses `Basic` listing type for everything

5229
yarn.lock Normal file

File diff suppressed because it is too large Load Diff