Compare commits

..

2 Commits

175 changed files with 7982 additions and 8775 deletions

View File

@ -1,3 +1,4 @@
node_modules/
**/node_modules/
.git
eve/

View File

@ -1,25 +1,7 @@
{
"extends": "scality",
"env": {
"es6": true
},
"parserOptions": {
"ecmaVersion": 9
},
"rules": {
"no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off",
"import/extensions": 0,
"prefer-spread": 0,
"no-param-reassign": 0,
"array-callback-return": 0
},
"settings": {
"import/resolver": {
"node": {
"paths": ["/utapi/node_modules", "node_modules"]
}
}
"extends": "scality",
"rules": {
"no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off"
}
}

View File

@ -1,14 +0,0 @@
# Creating this image for the CI as GitHub Actions
# is unable to overwrite the entrypoint
ARG REDIS_IMAGE="redis:latest"
FROM ${REDIS_IMAGE}
ENV REDIS_LISTEN_PORT 6380
ENV REDIS_MASTER_HOST redis
ENV REDIS_MASTER_PORT_NUMBER 6379
ENTRYPOINT redis-server \
--port ${REDIS_LISTEN_PORT} \
--slaveof ${REDIS_MASTER_HOST} ${REDIS_MASTER_PORT_NUMBER}

View File

@ -1,7 +0,0 @@
FROM ghcr.io/scality/vault:c2607856
ENV VAULT_DB_BACKEND LEVELDB
RUN chmod 400 tests/utils/keyfile
ENTRYPOINT yarn start

View File

@ -1,65 +0,0 @@
name: build-ci-images
on:
workflow_call:
jobs:
warp10-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: warp10-ci
context: .
file: images/warp10/Dockerfile
lfs: true
redis-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-ci
context: .
file: images/redis/Dockerfile
redis-replica-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
needs:
- redis-ci
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-replica-ci
context: .github/docker/redis-replica
build-args: |
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
vault-ci:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to GitHub Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ github.token }}
- name: Build and push vault Image
uses: docker/build-push-action@v5
with:
push: true
context: .github/docker/vault
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
cache-from: type=gha,scope=vault
cache-to: type=gha,mode=max,scope=vault

View File

@ -1,16 +0,0 @@
name: build-dev-image
on:
push:
branches-ignore:
- 'development/**'
jobs:
build-dev:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}

View File

@ -1,39 +0,0 @@
name: release-warp10
on:
workflow_dispatch:
inputs:
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: true
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets: inherit
with:
name: warp10
context: .
file: images/warp10/Dockerfile
tag: ${{ github.event.inputs.tag }}
lfs: true
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release utapi/warp10:${{ github.event.inputs.tag }}-warp10
tag_name: ${{ github.event.inputs.tag }}-warp10
generate_release_notes: false
target_commitish: ${{ github.sha }}

View File

@ -1,45 +0,0 @@
name: release
on:
workflow_dispatch:
inputs:
dockerfile:
description: Dockerfile to build image from
type: choice
options:
- images/nodesvc-base/Dockerfile
- Dockerfile
required: true
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: false
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}
context: .
file: ${{ github.event.inputs.dockerfile}}
tag: ${{ github.event.inputs.tag }}
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
name: Release ${{ github.event.inputs.tag }}
tag_name: ${{ github.event.inputs.tag }}
generate_release_notes: true
target_commitish: ${{ github.sha }}

View File

@ -1,361 +0,0 @@
---
name: tests
on:
push:
branches-ignore:
- 'development/**'
workflow_dispatch:
inputs:
debug:
description: Debug (enable the ability to SSH to runners)
type: boolean
required: false
default: 'false'
connection-timeout-m:
type: number
required: false
description: Timeout for ssh connection to worker (minutes)
default: 30
jobs:
build-ci:
uses: ./.github/workflows/build-ci.yaml
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: run static analysis tools on markdown
run: yarn run lint_md
- name: run static analysis tools on code
run: yarn run lint
tests-v1:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
name: ${{ matrix.test.name }}
strategy:
fail-fast: false
matrix:
test:
- name: run unit tests
command: yarn test
env:
UTAPI_METRICS_ENABLED: 'true'
- name: run v1 client tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client
env: {}
- name: run v1 server tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:server
env: {}
- name: run v1 cron tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:cron
env: {}
- name: run v1 interval tests
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:interval
env: {}
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
ports:
- 4802:4802
- 8082:8082
- 9718:9718
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
tests-v2-with-vault:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 for 60 seconds
run: sleep 60
- name: run v2 functional tests
run: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_SERVICE_USER_ENABLED: 'true'
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
tests-v2-without-sensision:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
name: ${{ matrix.test.name }}
strategy:
fail-fast: false
matrix:
test:
- name: run v2 soft limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:softLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: run v2 hard limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:hardLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 a little bit
run: sleep 60
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}

View File

@ -1,31 +1,21 @@
FROM node:16.13.2-buster-slim
FROM node:6-slim
WORKDIR /usr/src/app
COPY package.json yarn.lock /usr/src/app/
COPY package.json /usr/src/app
RUN apt-get update \
&& apt-get install -y \
curl \
gnupg2
RUN curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list
RUN apt-get update \
&& apt-get install -y jq git python3 build-essential yarn --no-install-recommends \
&& yarn cache clean \
&& yarn install --frozen-lockfile --production --ignore-optional --network-concurrency=1 \
&& apt-get autoremove --purge -y python3 git build-essential \
&& apt-get install -y jq --no-install-recommends \
&& npm install --production \
&& rm -rf /var/lib/apt/lists/* \
&& yarn cache clean \
&& npm cache clear --force \
&& rm -rf ~/.node-gyp \
&& rm -rf /tmp/yarn-*
&& rm -rf /tmp/npm-*
# Keep the .git directory in order to properly report version
COPY . /usr/src/app
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
CMD [ "yarn", "start" ]
CMD [ "npm", "start" ]
EXPOSE 8100

View File

@ -1,13 +1,12 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'CreateCheckpoint',
});
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
const task = new tasks.CreateCheckpoint();
task.setup()
.then(() => logger.info('Starting checkpoint creation'))

View File

@ -1,12 +1,11 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'CreateSnapshot',
});
const task = new tasks.CreateSnapshot({ warp10: [warp10Clients[0]] });
const task = new tasks.CreateSnapshot();
task.setup()
.then(() => logger.info('Starting snapshot creation'))

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'MonitorDiskUsage',
});
const task = new tasks.MonitorDiskUsage({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting disk usage monitor'))
.then(() => task.start())
.then(() => logger.info('Disk usage monitor started'));

View File

@ -1,276 +0,0 @@
#! /usr/bin/env node
// TODO
// - deduplicate with Vault's seed script at https://github.com/scality/Vault/pull/1627
// - add permission boundaries to user when https://scality.atlassian.net/browse/VAULT-4 is implemented
const { errors } = require('arsenal');
const program = require('commander');
const werelogs = require('werelogs');
const async = require('async');
const { IAM } = require('aws-sdk');
const { version } = require('../package.json');
const systemPrefix = '/scality-internal/';
function generateUserPolicyDocument() {
return {
Version: '2012-10-17',
Statement: {
Effect: 'Allow',
Action: 'utapi:ListMetrics',
Resource: 'arn:scality:utapi:::*/*',
},
};
}
function createIAMClient(opts) {
return new IAM({
endpoint: opts.iamEndpoint,
});
}
function needsCreation(v) {
if (Array.isArray(v)) {
return !v.length;
}
return !v;
}
class BaseHandler {
constructor(serviceName, iamClient, log) {
this.serviceName = serviceName;
this.iamClient = iamClient;
this.log = log;
}
applyWaterfall(values, done) {
this.log.debug('applyWaterfall', { values, type: this.resourceType });
const v = values[this.resourceType];
if (needsCreation(v)) {
this.log.debug('creating', { v, type: this.resourceType });
return this.create(values)
.then(res =>
done(null, Object.assign(values, {
[this.resourceType]: res,
})))
.catch(done);
}
this.log.debug('conflicts check', { v, type: this.resourceType });
if (this.conflicts(v)) {
return done(errors.EntityAlreadyExists.customizeDescription(
`${this.resourceType} ${this.serviceName} already exists and conflicts with the expected value.`));
}
this.log.debug('nothing to do', { v, type: this.resourceType });
return done(null, values);
}
}
class UserHandler extends BaseHandler {
get resourceType() {
return 'user';
}
collect() {
return this.iamClient.getUser({
UserName: this.serviceName,
})
.promise()
.then(res => res.User);
}
create(allResources) {
return this.iamClient.createUser({
UserName: this.serviceName,
Path: systemPrefix,
})
.promise()
.then(res => res.User);
}
conflicts(u) {
return u.Path !== systemPrefix;
}
}
class PolicyHandler extends BaseHandler {
get resourceType() {
return 'policy';
}
collect() {
return this.iamClient.listPolicies({
MaxItems: 100,
OnlyAttached: false,
Scope: 'All',
})
.promise()
.then(res => res.Policies.find(p => p.PolicyName === this.serviceName));
}
create(allResources) {
const doc = generateUserPolicyDocument();
return this.iamClient.createPolicy({
PolicyName: this.serviceName,
PolicyDocument: JSON.stringify(doc),
Path: systemPrefix,
})
.promise()
.then(res => res.Policy);
}
conflicts(p) {
return p.Path !== systemPrefix;
}
}
class PolicyAttachmentHandler extends BaseHandler {
get resourceType() {
return 'policyAttachment';
}
collect() {
return this.iamClient.listAttachedUserPolicies({
UserName: this.serviceName,
MaxItems: 100,
})
.promise()
.then(res => res.AttachedPolicies)
}
create(allResources) {
return this.iamClient.attachUserPolicy({
PolicyArn: allResources.policy.Arn,
UserName: this.serviceName,
})
.promise();
}
conflicts(p) {
return false;
}
}
class AccessKeyHandler extends BaseHandler {
get resourceType() {
return 'accessKey';
}
collect() {
return this.iamClient.listAccessKeys({
UserName: this.serviceName,
MaxItems: 100,
})
.promise()
.then(res => res.AccessKeyMetadata)
}
create(allResources) {
return this.iamClient.createAccessKey({
UserName: this.serviceName,
})
.promise()
.then(res => res.AccessKey);
}
conflicts(a) {
return false;
}
}
function collectResource(v, done) {
v.collect()
.then(res => done(null, res))
.catch(err => {
if (err.code === 'NoSuchEntity') {
return done(null, null);
}
done(err);
});
}
function collectResourcesFromHandlers(handlers, cb) {
const tasks = handlers.reduce((acc, v) => ({
[v.resourceType]: done => collectResource(v, done),
...acc,
}), {});
async.parallel(tasks, cb);
}
function buildServiceUserHandlers(serviceName, client, log) {
return [
UserHandler,
PolicyHandler,
PolicyAttachmentHandler,
AccessKeyHandler,
].map(h => new h(serviceName, client, log));
}
function apply(client, serviceName, log, cb) {
const handlers = buildServiceUserHandlers(serviceName, client, log);
async.waterfall([
done => collectResourcesFromHandlers(handlers, done),
...handlers.map(h => h.applyWaterfall.bind(h)),
(values, done) => done(null, values.accessKey),
], cb);
}
function wrapAction(actionFunc, serviceName, options) {
werelogs.configure({
level: options.logLevel,
dump: options.logDumpLevel,
});
const log = new werelogs.Logger(process.argv[1]).newRequestLogger();
const client = createIAMClient(options);
actionFunc(client, serviceName, log, (err, data) => {
if (err) {
log.error('failed', {
data,
error: err,
});
if (err.EntityAlreadyExists) {
log.error(`run "${process.argv[1]} purge ${serviceName}" to fix.`);
}
process.exit(1);
}
log.info('success', { data });
process.exit();
});
}
program.version(version);
[
{
name: 'apply <service-name>',
actionFunc: apply,
},
].forEach(cmd => {
program
.command(cmd.name)
.option('--iam-endpoint <url>', 'IAM endpoint', 'http://localhost:8600')
.option('--log-level <level>', 'log level', 'info')
.option('--log-dump-level <level>', 'log level that triggers a dump of the debug buffer', 'error')
.action(wrapAction.bind(null, cmd.actionFunc));
});
const validCommands = program.commands.map(n => n._name);
// Is the command given invalid or are there too few arguments passed
if (!validCommands.includes(process.argv[2])) {
program.outputHelp();
process.stdout.write('\n');
process.exit(1);
} else {
program.parse(process.argv);
}

View File

@ -1,13 +1,12 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'IngestShard',
});
const task = new tasks.IngestShard({ warp10: warp10Clients });
const task = new tasks.IngestShard();
task.setup()
.then(() => logger.info('Starting shard ingestion'))

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'ManualAdjust',
});
const task = new tasks.ManualAdjust({ warp10: warp10Clients });
task.setup()
.then(() => logger.info('Starting manual adjustment'))
.then(() => task.start())
.then(() => logger.info('Manual adjustment started'));

View File

@ -1,14 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Migrate',
});
const task = new tasks.MigrateTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting utapi v1 => v2 migration'))
.then(() => task.start())
.then(() => logger.info('Migration started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Reindex',
});
const task = new tasks.ReindexTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting Reindex daemon'))
.then(() => task.start())
.then(() => logger.info('Reindex started'));

View File

@ -1,13 +1,12 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Repair',
});
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
const task = new tasks.RepairTask();
task.setup()
.then(() => logger.info('Starting Repair daemon'))

View File

@ -1,75 +0,0 @@
version: '3.8'
x-models:
warp10: &warp10
build:
context: .
dockerfile: ./images/warp10/Dockerfile
volumes: [ $PWD/warpscript:/usr/local/share/warpscript ]
warp10_env: &warp10_env
ENABLE_WARPSTUDIO: 'true'
ENABLE_SENSISION: 'true'
warpscript.repository.refresh: 1000
warpscript.maxops: 1000000000
warpscript.maxops.hard: 1000000000
warpscript.maxfetch: 1000000000
warpscript.maxfetch.hard: 1000000000
warpscript.extension.debug: io.warp10.script.ext.debug.DebugWarpScriptExtension
warpscript.maxrecursion: 1000
warpscript.repository.directory: /usr/local/share/warpscript
warpscript.extension.logEvent: io.warp10.script.ext.logging.LoggingWarpScriptExtension
redis: &redis
build:
context: .
dockerfile: ./images/redis/Dockerfile
services:
redis-0:
image: redis:7.2.4
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6379:6379
environment:
- HOST_IP="${EXTERNAL_HOST}"
redis-1:
image: redis:7.2.4
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6380:6380
environment:
- HOST_IP="${EXTERNAL_HOST}"
redis-sentinel-0:
image: redis:7.2.4
command: |-
bash -c 'cat > /tmp/sentinel.conf <<EOF
port 16379
logfile ""
dir /tmp
sentinel announce-ip ${EXTERNAL_HOST}
sentinel announce-port 16379
sentinel monitor scality-s3 "${EXTERNAL_HOST}" 6379 1
EOF
redis-sentinel /tmp/sentinel.conf'
environment:
- HOST_IP="${EXTERNAL_HOST}"
ports:
- 16379:16379
warp10:
<< : *warp10
environment:
<< : *warp10_env
ports:
- 4802:4802
- 8081:8081
- 9718:9718
volumes:
- /tmp/warp10:/data
- '${PWD}/warpscript:/usr/local/share/warpscript'

View File

@ -1,42 +0,0 @@
# Utapi Release Plan
## Docker Image Generation
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
Utapi has one namespace there:
* Namespace: ghcr.io/scality/utapi
With every CI build, the CI will push images, tagging the
content with the developer branch's short SHA-1 commit hash.
This allows those images to be used by developers, CI builds,
build chain and so on.
Tagged versions of utapi will be stored in the production namespace.
## How to Pull Docker Images
```sh
docker pull ghcr.io/scality/utapi:<commit hash>
docker pull ghcr.io/scality/utapi:<tag>
```
## Release Process
To release a production image:
* Name the tag for the repository and Docker image.
* Use the `yarn version` command with the same tag to update `package.json`.
* Create a PR and merge the `package.json` change.
* Tag the repository using the same tag.
* [Force a build] using:
* A given branch that ideally matches the tag.
* The `release` stage.
* An extra property with the name `tag` and its value being the actual tag.
[Force a build]:
https://eve.devsca.com/github/scality/utapi/#/builders/bootstrap/force/force

133
eve/main.yml Normal file
View File

@ -0,0 +1,133 @@
---
version: 0.2
branches:
default:
stage: pre-merge
models:
- Git: &clone
name: Pull repo
repourl: '%(prop:git_reference)s'
shallow: True
retryFetch: True
haltOnFailure: True
- Workspace: &workspace
type: kube_pod
path: eve/workers/pod.yml
images:
aggressor: eve/workers/unit_and_feature_tests
warp10:
context: '.'
dockerfile: 'images/warp10/Dockerfile'
vault: eve/workers/mocks/vault
- Install: &install
name: install node modules
command: yarn install --frozen-lockfile
haltOnFailure: True
- Upload: &upload_artifacts
source: /artifacts
urls:
- "*"
stages:
pre-merge:
worker:
type: local
steps:
- MasterShellCommand:
name: Replace upstream image with `-ci` variant
command: "sed -i '/^FROM/ s/$/-ci/' %(prop:master_builddir)s/build/images/warp10/Dockerfile"
- TriggerStages:
name: trigger all the tests
stage_names:
- linting-coverage
- run-unit-tests
- run-client-tests
- run-server-tests
- run-cron-tests
- run-interval-tests
- run-v2-functional-tests
linting-coverage:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run static analysis tools on markdown
command: yarn run lint_md
- ShellCommand:
name: run static analysis tools on code
command: yarn run lint
run-unit-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run unit tests
command: yarn test
run-client-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run client tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
run-server-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run server tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
run-cron-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run cron tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
run-interval-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run interval tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
run-v2-functional-tests:
worker:
<< : *workspace
vars:
vault: enabled
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: Wait for Warp 10
command: |
bash -c "
set -ex
bash tests/utils/wait_for_local_port.bash 4802 60"
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
- ShellCommand:
name: run v2 functional tests
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
utapi:
filename: "/artifacts/setup_ft_test:v2.log"
follow: true
- Upload: *upload_artifacts

View File

@ -0,0 +1,7 @@
FROM node:alpine
ADD ./vault.js /usr/share/src/
WORKDIR /usr/share/src/
CMD node vault.js

View File

@ -0,0 +1,32 @@
const http = require('http');
const port = process.env.VAULT_PORT || 8500;
class Vault {
constructor() {
this._server = null;
}
static _onRequest(req, res) {
res.writeHead(200);
return res.end();
}
start() {
this._server = http.createServer(Vault._onRequest).listen(port);
}
end() {
this._server.close();
}
}
const vault = new Vault();
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
process.on(eventName, () => process.exit(0));
});
// eslint-disable-next-line no-console
console.log('Starting Vault Mock...');
vault.start();

67
eve/workers/pod.yml Normal file
View File

@ -0,0 +1,67 @@
apiVersion: v1
kind: Pod
metadata:
name: "utapi-test-pod"
spec:
activeDeadlineSeconds: 3600
restartPolicy: Never
terminationGracePeriodSeconds: 10
containers:
- name: aggressor
image: "{{ images.aggressor }}"
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: "2"
memory: 3Gi
volumeMounts:
- mountPath: /var/run/docker.sock
name: docker-socket
- name: artifacts
readOnly: false
mountPath: /artifacts
- name: warp10
image: "{{ images.warp10 }}"
command:
- sh
- -ce
- /init | tee -a /artifacts/warp10.log
env:
- name: standalone.port
value: '4802'
- name: warp.token.file
value: /opt/warp10/etc/ci.tokens
- name: warpscript.maxops
value: '10000000'
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1750m
memory: 3Gi
volumeMounts:
- name: artifacts
readOnly: false
mountPath: /artifacts
{% if vars.vault is defined and vars.vault == 'enabled' %}
- name: vault
image: "{{ images.vault }}"
resources:
requests:
cpu: 10m
memory: 64Mi
limits:
cpu: 50m
memory: 128Mi
{% endif %}
volumes:
- name: docker-socket
hostPath:
path: /var/run/docker.sock
type: Socket
- name: artifacts
emptyDir: {}

View File

@ -0,0 +1,34 @@
FROM buildpack-deps:jessie-curl
#
# Install apt packages needed by utapi and buildbot_worker
#
ENV LANG C.UTF-8
ENV NODE_VERSION 10.22.0
COPY utapi_packages.list buildbot_worker_packages.list /tmp/
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
&& apt-get update -qq \
&& cat /tmp/*packages.list | xargs apt-get install -y \
&& pip install pip==9.0.1 \
&& rm -rf /var/lib/apt/lists/* \
&& rm -f /tmp/*packages.list \
&& rm -f /etc/supervisor/conf.d/*.conf \
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
#
# Run buildbot-worker on startup through supervisor
#
ARG BUILDBOT_VERSION
RUN pip install buildbot-worker==$BUILDBOT_VERSION
RUN pip3 install requests
RUN pip3 install redis
ADD supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
ADD redis/sentinel.conf /etc/sentinel.conf
CMD ["supervisord", "-n"]

View File

@ -0,0 +1,11 @@
ca-certificates
git
libffi-dev
libssl-dev
python2.7
python2.7-dev
python-pip
sudo
supervisor
lsof
netcat

View File

@ -0,0 +1,35 @@
# Example sentinel.conf
# The port that this sentinel instance will run on
port 16379
# Specify the log file name. Also the empty string can be used to force
# Sentinel to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile ""
# dir <working-directory>
# Every long running process should have a well-defined working directory.
# For Redis Sentinel to chdir to /tmp at startup is the simplest thing
# for the process to don't interfere with administrative tasks such as
# unmounting filesystems.
dir /tmp
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
#
# Tells Sentinel to monitor this master, and to consider it in O_DOWN
# (Objectively Down) state only if at least <quorum> sentinels agree.
#
# Note that whatever is the ODOWN quorum, a Sentinel will require to
# be elected by the majority of the known Sentinels in order to
# start a failover, so no failover can be performed in minority.
#
# Replicas are auto-discovered, so you don't need to specify replicas in
# any way. Sentinel itself will rewrite this configuration file adding
# the replicas using additional configuration options.
# Also note that the configuration file is rewritten when a
# replica is promoted to master.
#
# Note: master name should not include special characters or spaces.
# The valid charset is A-z 0-9 and the three characters ".-_".
sentinel monitor scality-s3 127.0.0.1 6379 1

View File

@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
SETUP_CMD="start"
fi
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "setup_$2.log" &
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD | tee -a "/artifacts/setup_$2.log" &
bash tests/utils/wait_for_local_port.bash $PORT 40
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "test_$2.log"
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"

View File

@ -0,0 +1,14 @@
[program:buildbot_worker]
command=/bin/sh -c 'buildbot-worker create-worker . "%(ENV_BUILDMASTER)s:%(ENV_BUILDMASTER_PORT)s" "%(ENV_WORKERNAME)s" "%(ENV_WORKERPASS)s" && buildbot-worker start --nodaemon'
autostart=true
autorestart=false
[program:redis_server]
command=/usr/bin/redis-server
autostart=true
autorestart=false
[program:redis_sentinel]
command=/usr/bin/redis-server /etc/sentinel.conf --sentinel
autostart=true
autorestart=false

View File

@ -0,0 +1,5 @@
build-essential
redis-server
python3
python3-pip
yarn

View File

@ -1,20 +0,0 @@
FROM ghcr.io/scality/federation/nodesvc-base:7.10.5.0
ENV UTAPI_CONFIG_FILE=${CONF_DIR}/config.json
WORKDIR ${HOME_DIR}/utapi
COPY ./package.json ./yarn.lock ${HOME_DIR}/utapi
# Remove when gitcache is sorted out
RUN rm /root/.gitconfig
RUN yarn install --production --frozen-lockfile --network-concurrency 1
COPY . ${HOME_DIR}/utapi
RUN chown -R ${USER} ${HOME_DIR}/utapi
USER ${USER}
CMD bash -c "source ${CONF_DIR}/env && export && supervisord -c ${CONF_DIR}/${SUPERVISORD_CONF}"

View File

@ -1,17 +0,0 @@
FROM redis:alpine
ENV S6_VERSION 2.0.0.1
ENV EXPORTER_VERSION 1.24.0
ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
&& tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \
&& rm -rf /tmp/s6-overlay-amd64.tar.gz
RUN wget https://github.com/oliver006/redis_exporter/releases/download/v${EXPORTER_VERSION}/redis_exporter-v${EXPORTER_VERSION}.linux-amd64.tar.gz -O redis_exporter.tar.gz \
&& tar xzf redis_exporter.tar.gz -C / \
&& cd .. \
&& mv /redis_exporter-v${EXPORTER_VERSION}.linux-amd64/redis_exporter /usr/local/bin/redis_exporter
ADD ./images/redis/s6 /etc
CMD /init

View File

@ -1,4 +0,0 @@
#!/usr/bin/with-contenv sh
echo "starting redis exporter"
exec redis_exporter

View File

@ -1,4 +0,0 @@
#!/usr/bin/with-contenv sh
echo "starting redis"
exec redis-server

View File

@ -1,2 +0,0 @@
standalone.host = 0.0.0.0
standalone.port = 4802

View File

@ -1,35 +1,12 @@
FROM golang:1.14-alpine as builder
ENV WARP10_EXPORTER_VERSION 2.7.5
RUN apk add zip unzip build-base \
&& wget -q -O exporter.zip https://github.com/centreon/warp10-sensision-exporter/archive/refs/heads/master.zip \
&& unzip exporter.zip \
&& cd warp10-sensision-exporter-master \
&& go mod download \
&& cd tools \
&& go run generate_sensision_metrics.go ${WARP10_EXPORTER_VERSION} \
&& cp sensision.go ../collector/ \
&& cd .. \
&& go build -a -o /usr/local/go/warp10_sensision_exporter
FROM ghcr.io/scality/utapi/warp10:2.8.1-95-g73e7de80
# Override baked in version
# Remove when updating to a numbered release
ENV WARP10_VERSION 2.8.1-95-g73e7de80
FROM warp10io/warp10:2.6.0
ENV S6_VERSION 2.0.0.1
ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2
ENV WARP10_CONF_TEMPLATES ${WARP10_HOME}/conf.templates/standalone
ENV SENSISION_DATA_DIR /data/sensision
ENV SENSISION_PORT 8082
# Modify Warp 10 default config
ENV standalone.home /opt/warp10
ENV standalone.host 0.0.0.0
ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warp.token.file /static.tokens
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
@ -38,19 +15,10 @@ RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_V
&& tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \
&& rm -rf /tmp/s6-overlay-amd64.tar.gz
# Install jmx exporter
ADD https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar /opt/jmx_prom_agent.jar
ADD ./images/warp10/jmx_prom.yaml /opt/jmx_prom.yaml
# Install protobuf extestion
ADD ./images/warp10/warp10-ext-protobuf-1.2.2-uberjar.jar /opt/warp10/lib/
# Install Sensision exporter
COPY --from=builder /usr/local/go/warp10_sensision_exporter /usr/local/bin/warp10_sensision_exporter
ADD https://dl.bintray.com/senx/maven/io/warp10/warp10-ext-protobuf/1.1.0-uberjar/warp10-ext-protobuf-1.1.0-uberjar.jar /opt/warp10/lib/
ADD ./images/warp10/s6 /etc
ADD ./warpscript /usr/local/share/warpscript
ADD ./images/warp10/static.tokens /
ADD ./images/warp10/90-default-host-port.conf $WARP10_CONF_TEMPLATES/90-default-host-port.conf
CMD /init

View File

@ -1,2 +0,0 @@
---
startDelaySeconds: 0

View File

@ -14,9 +14,4 @@ ensureDir "$WARP10_DATA_DIR/logs"
ensureDir "$WARP10_DATA_DIR/conf"
ensureDir "$WARP10_DATA_DIR/data/leveldb"
ensureDir "$WARP10_DATA_DIR/data/datalog"
ensureDir "$WARP10_DATA_DIR/data/datalog_done"
ensureDir "$SENSISION_DATA_DIR"
ensureDir "$SENSISION_DATA_DIR/logs"
ensureDir "$SENSISION_DATA_DIR/conf"
ensureDir "/var/run/sensision"
ensureDir "$WARP10_DATA_DIR/data/datalog_done"

View File

@ -2,13 +2,10 @@
set -eu
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
WARP10_SECRETS="$WARP10_CONFIG_DIR/00-secrets.conf"
if [ ! -f "$WARP10_SECRETS" ]; then
cp "$WARP10_CONF_TEMPLATES/00-secrets.conf.template" "$WARP10_SECRETS"
/usr/bin/java -cp ${WARP10_JAR} -Dfile.encoding=UTF-8 io.warp10.GenerateCryptoKey ${WARP10_SECRETS}
echo "warp10.manager.secret = scality" >> $WARP10_SECRETS
python "${WARP10_HOME}/etc/generate_crypto_key.py" "$WARP10_SECRETS"
fi

View File

@ -1,14 +1,9 @@
#!/usr/bin/with-contenv sh
echo "Installing warp 10 config"
for path in $WARP10_CONF_TEMPLATES/*; do
name="$(basename $path .template)"
if [ ! -f "$WARP10_DATA_DIR/conf/$name" ]; then
cp "$path" "$WARP10_DATA_DIR/conf/$name"
echo "Copied $name to $WARP10_DATA_DIR/conf/$name"
fi
done
echo "Installing sensision config"
cp ${SENSISION_HOME}/templates/sensision.template ${SENSISION_DATA_DIR}/conf/sensision.conf
cp ${SENSISION_HOME}/templates/log4j.properties.template ${SENSISION_DATA_DIR}/conf/log4j.properties
done

View File

@ -14,10 +14,4 @@ ensure_link "$WARP10_HOME/logs" "$WARP10_DATA_DIR/logs"
ensure_link "$WARP10_HOME/etc/conf.d" "$WARP10_DATA_DIR/conf"
ensure_link "$WARP10_HOME/leveldb" "$WARP10_DATA_DIR/data/leveldb"
ensure_link "$WARP10_HOME/datalog" "$WARP10_DATA_DIR/data/datalog"
ensure_link "$WARP10_HOME/datalog_done" "$WARP10_DATA_DIR/data/datalog_done"
ensure_link "$SENSISION_HOME/etc" "${SENSISION_DATA_DIR}/conf"
ensure_link "$SENSISION_HOME/logs" "${SENSISION_DATA_DIR}/logs"
ensure_link /var/run/sensision/metrics ${SENSISION_HOME}/metrics
ensure_link /var/run/sensision/targets ${SENSISION_HOME}/targets
ensure_link /var/run/sensision/queued ${SENSISION_HOME}/queued
ensure_link "$WARP10_HOME/datalog_done" "$WARP10_DATA_DIR/data/datalog_done"

View File

@ -0,0 +1,28 @@
#!/usr/bin/with-contenv sh
set -eu
JAVA="/usr/bin/java"
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*"
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
INITIAL_TOKENS="$WARP10_CONFIG_DIR/initial.token"
if [ ! -f "$INITIAL_TOKENS" ]; then
CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path "*/\.*" -name "*.conf" | sort | tr '\n' ' ' 2> /dev/null)"
# Look for a set token secret and use it for generation
secret=`${JAVA} -cp ${WARP10_CP} io.warp10.WarpConfig ${CONFIG_FILES} . 'token.secret' | sed -n 's/^@CONF@ //p' | sed -n 's/^token.secret[^=]*=//p'`
if [[ "${secret}" != "null" ]]; then
sed -i.bak -e "s|^{{secret}}|'"${secret}"'|" ${WARP10_HOME}/templates/warp10-tokengen.mc2
else
sed -i.bak -e "s|^{{secret}}||" ${WARP10_HOME}/templates/warp10-tokengen.mc2
fi
rm ${WARP10_HOME}/templates/warp10-tokengen.mc2.bak
# Generate read/write tokens valid for a period of 100 years. We use 'io.warp10.bootstrap' as application name.
${JAVA} -cp ${WARP10_JAR} io.warp10.worf.TokenGen ${CONFIG_FILES} ${WARP10_HOME}/templates/warp10-tokengen.mc2 $INITIAL_TOKENS
sed -i.bak 's/^.\{1\}//;$ s/.$//' $INITIAL_TOKENS # Remove first and last character
rm "${INITIAL_TOKENS}.bak"
fi

View File

@ -1,9 +0,0 @@
#!/usr/bin/with-contenv sh
chmod 1733 "$SENSISION_HOME/metrics"
chmod 1733 "$SENSISION_HOME/targets"
chmod 700 "$SENSISION_HOME/queued"
sed -i 's/@warp:WriteToken@/'"writeTokenStatic"'/' $SENSISION_DATA_DIR/conf/sensision.conf
sed -i -e "s_^sensision\.home.*_sensision\.home = ${SENSISION_HOME}_" $SENSISION_DATA_DIR/conf/sensision.conf
sed -i -e 's_^sensision\.qf\.url\.default.*_sensision\.qf\.url\.default=http://127.0.0.1:4802/api/v0/update_' $SENSISION_DATA_DIR/conf/sensision.conf

View File

@ -1,12 +0,0 @@
#!/usr/bin/with-contenv sh
EXPORTER_CMD="warp10_sensision_exporter --warp10.url=http://localhost:${SENSISION_PORT}/metrics"
if [ -f "/usr/local/bin/warp10_sensision_exporter" -a -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision exporter with $EXPORTER_CMD ..."
exec $EXPORTER_CMD
else
echo "Sensision is disabled. Not starting exporter."
# wait indefinitely
exec tail -f /dev/null
fi

View File

@ -1,25 +0,0 @@
#!/usr/bin/with-contenv sh
JAVA="/usr/bin/java"
JAVA_OPTS=""
SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${SENSISION_VERSION}.jar
SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR}
SENSISION_CLASS=io.warp10.sensision.Main
export MALLOC_ARENA_MAX=1
if [ -z "$SENSISION_HEAP" ]; then
SENSISION_HEAP=64m
fi
SENSISION_CMD="${JAVA} ${JAVA_OPTS} -Xmx${SENSISION_HEAP} -Dsensision.server.port=${SENSISION_PORT} ${SENSISION_OPTS} -Dsensision.config=${SENSISION_CONFIG} -cp ${SENSISION_CP} ${SENSISION_CLASS}"
if [ -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision with $SENSISION_CMD ..."
exec $SENSISION_CMD | tee -a ${SENSISION_HOME}/logs/sensision.log
else
echo "Sensision is disabled. Not starting."
# wait indefinitely
exec tail -f /dev/null
fi

View File

@ -1,43 +1,13 @@
#!/usr/bin/with-contenv sh
export SENSISIONID=warp10
export MALLOC_ARENA_MAX=1
JAVA="/usr/bin/java"
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CLASS=io.warp10.standalone.Warp
WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*"
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path '*/.*' -name '*.conf' | sort | tr '\n' ' ' 2> /dev/null)"
LOG4J_CONF=${WARP10_HOME}/etc/log4j.properties
CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path "*/\.*" -name "*.conf" | sort | tr '\n' ' ' 2> /dev/null)"
if [ -z "$WARP10_HEAP" ]; then
WARP10_HEAP=1g
fi
if [ -z "$WARP10_HEAP_MAX" ]; then
WARP10_HEAP_MAX=4g
fi
JAVA_OPTS="-Dlog4j.configuration=file:${LOG4J_CONF} ${JAVA__EXTRA_OPTS} -Djava.awt.headless=true -Xms${WARP10_HEAP} -Xmx${WARP10_HEAP_MAX} -XX:+UseG1GC"
SENSISION_OPTS=
if [ -n "$ENABLE_SENSISION" ]; then
_SENSISION_LABELS=
# Expects a comma seperated list of key=value ex key=value,foo=bar
if [ -n "$SENSISION_LABELS" ]; then
_SENSISION_LABELS="-Dsensision.default.labels=$SENSISION_LABELS"
fi
SENSISION_OPTS="${_SENSISION_LABELS} -Dsensision.events.dir=/var/run/sensision/metrics -Dfile.encoding=UTF-8 ${SENSISION_EXTRA_OPTS}"
fi
JMX_EXPORTER_OPTS=
if [ -n "$ENABLE_JMX_EXPORTER" ]; then
JMX_EXPORTER_OPTS="-javaagent:/opt/jmx_prom_agent.jar=4803:/opt/jmx_prom.yaml ${JMX_EXPORTER_EXTRA_OPTS}"
echo "Starting jmx exporter with Warp 10."
fi
WARP10_CMD="${JAVA} ${JMX_EXPORTER_OPTS} ${JAVA_OPTS} ${SENSISION_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
WARP10_CMD="${JAVA} ${JAVA_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
echo "Starting Warp 10 with $WARP10_CMD ..."
exec $WARP10_CMD | tee -a ${WARP10_HOME}/logs/warp10.log
exec $WARP10_CMD | tee -a ${WARP10_HOME}/logs/warp10.log

View File

@ -1,9 +0,0 @@
token.write.0.name=writeTokenStatic
token.write.0.producer=42424242-4242-4242-4242-424242424242
token.write.0.owner=42424242-4242-4242-4242-424242424242
token.write.0.app=utapi
token.read.0.name=readTokenStatic
token.read.0.owner=42424242-4242-4242-4242-424242424242
token.read.0.app=utapi

View File

@ -1,13 +1,35 @@
/* eslint-disable no-bitwise */
const assert = require('assert');
const fs = require('fs');
const path = require('path');
/**
* Reads from a config file and returns the content as a config object
*/
class Config {
constructor(config) {
this.component = config.component;
constructor() {
/*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
this._basePath = path.resolve(__dirname, '..');
this.path = `${this._basePath}/config.json`;
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
this.path = process.env.UTAPI_CONFIG_FILE;
}
// Read config automatically
this._getConfig();
}
_getConfig() {
let config;
try {
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
config = JSON.parse(data);
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
this.port = 9500;
if (config.port !== undefined) {
@ -93,26 +115,18 @@ class Config {
}
}
if (config.vaultclient) {
// Instance passed from outside
this.vaultclient = config.vaultclient;
this.vaultd = null;
} else {
// Connection data
this.vaultclient = null;
this.vaultd = {};
if (config.vaultd) {
if (config.vaultd.port !== undefined) {
assert(Number.isInteger(config.vaultd.port)
&& config.vaultd.port > 0,
'bad config: vaultd port must be a positive integer');
this.vaultd.port = config.vaultd.port;
}
if (config.vaultd.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.vaultd.host = config.vaultd.host;
}
this.vaultd = {};
if (config.vaultd) {
if (config.vaultd.port !== undefined) {
assert(Number.isInteger(config.vaultd.port)
&& config.vaultd.port > 0,
'bad config: vaultd port must be a positive integer');
this.vaultd.port = config.vaultd.port;
}
if (config.vaultd.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.vaultd.host = config.vaultd.host;
}
}
@ -127,11 +141,12 @@ class Config {
const { key, cert, ca } = config.certFilePaths
? config.certFilePaths : {};
if (key && cert) {
const keypath = key;
const certpath = cert;
const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
const certpath = (cert[0] === '/')
? cert : `${this._basePath}/${cert}`;
let capath;
if (ca) {
capath = ca;
capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`);
}
@ -157,13 +172,8 @@ class Config {
+ 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics;
}
if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}
return config;
}
}
module.exports = Config;
module.exports = new Config();

View File

@ -33,10 +33,7 @@ class Datastore {
* @return {undefined}
*/
set(key, value, cb) {
return this._client.call(
(backend, done) => backend.set(key, value, done),
cb,
);
return this._client.set(key, value, cb);
}
/**
@ -48,7 +45,7 @@ class Datastore {
*/
setExpire(key, value, ttl) {
// This method is a Promise because no callback is given.
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX'));
return this._client.set(key, value, 'EX', ttl, 'NX');
}
/**
@ -57,8 +54,7 @@ class Datastore {
* @return {undefined}
*/
del(key) {
// This method is a Promise because no callback is given.
return this._client.call(backend => backend.del(key));
return this._client.del(key);
}
/**
@ -68,7 +64,7 @@ class Datastore {
* @return {undefined}
*/
get(key, cb) {
return this._client.call((backend, done) => backend.get(key, done), cb);
return this._client.get(key, cb);
}
/**
@ -78,7 +74,7 @@ class Datastore {
* @return {undefined}
*/
incr(key, cb) {
return this._client.call((backend, done) => backend.incr(key, done), cb);
return this._client.incr(key, cb);
}
/**
@ -89,7 +85,7 @@ class Datastore {
* @return {undefined}
*/
incrby(key, value, cb) {
return this._client.call((backend, done) => backend.incrby(key, value, done), cb);
return this._client.incrby(key, value, cb);
}
/**
@ -99,7 +95,7 @@ class Datastore {
* @return {undefined}
*/
decr(key, cb) {
return this._client.call((backend, done) => backend.decr(key, done), cb);
return this._client.decr(key, cb);
}
/**
@ -110,7 +106,7 @@ class Datastore {
* @return {undefined}
*/
decrby(key, value, cb) {
return this._client.call((backend, done) => backend.decrby(key, value, done), cb);
return this._client.decrby(key, value, cb);
}
/**
@ -122,7 +118,7 @@ class Datastore {
* @return {undefined}
*/
zadd(key, score, value, cb) {
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb);
return this._client.zadd(key, score, value, cb);
}
/**
@ -135,7 +131,7 @@ class Datastore {
* @return {undefined}
*/
zrange(key, min, max, cb) {
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb);
return this._client.zrange(key, min, max, cb);
}
/**
@ -148,7 +144,7 @@ class Datastore {
* @return {undefined}
*/
zrangebyscore(key, min, max, cb) {
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb);
return this._client.zrangebyscore(key, min, max, cb);
}
/**
@ -161,12 +157,9 @@ class Datastore {
* @return {undefined}
*/
bZrangebyscore(keys, min, max, cb) {
return this._client.call(
(backend, done) => backend
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
.exec(done),
cb,
);
return this._client.pipeline(keys.map(
item => ['zrangebyscore', item, min, max],
)).exec(cb);
}
/**
@ -176,9 +169,7 @@ class Datastore {
* @return {undefined}
*/
batch(cmds, cb) {
return this._client.call((backend, done) => {
backend.multi(cmds).exec(done);
}, cb);
return this._client.multi(cmds).exec(cb);
}
/**
@ -188,7 +179,7 @@ class Datastore {
* @return {undefined}
*/
pipeline(cmds, cb) {
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb);
return this._client.pipeline(cmds).exec(cb);
}
/**
@ -198,21 +189,20 @@ class Datastore {
* @return {undefined}
*/
multi(cmds, cb) {
return this._client.call((backend, done) =>
backend.multi(cmds).exec((err, res) => {
if (err) {
return done(err);
}
const flattenRes = [];
const resErr = res.filter(item => {
flattenRes.push(item[1]);
return item[0] !== null;
});
if (resErr && resErr.length > 0) {
return done(resErr);
}
return done(null, flattenRes);
}), cb);
return this._client.multi(cmds).exec((err, res) => {
if (err) {
return cb(err);
}
const flattenRes = [];
const resErr = res.filter(item => {
flattenRes.push(item[1]);
return item[0] !== null;
});
if (resErr && resErr.length > 0) {
return cb(resErr);
}
return cb(null, flattenRes);
});
}
/**
@ -225,7 +215,7 @@ class Datastore {
* @return {undefined}
*/
zremrangebyscore(key, min, max, cb) {
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb);
return this._client.zremrangebyscore(key, min, max, cb);
}
/**
@ -236,7 +226,7 @@ class Datastore {
* @return {undefined}
*/
lpush(key, val, cb) {
return this._client.call((backend, done) => backend.lpush(key, val, done), cb);
return this._client.lpush(key, val, cb);
}
/**
@ -246,7 +236,7 @@ class Datastore {
* @return {undefined}
*/
rpop(key, cb) {
return this._client.call((backend, done) => backend.rpop(key, done), cb);
return this._client.rpop(key, cb);
}
/**
@ -258,7 +248,7 @@ class Datastore {
* @return {undefined}
*/
lrange(key, start, stop, cb) {
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb);
return this._client.lrange(key, start, stop, cb);
}
/**
@ -268,7 +258,7 @@ class Datastore {
* @return {undefined}
*/
llen(key, cb) {
return this._client.call((backend, done) => backend.llen(key, done), cb);
return this._client.llen(key, cb);
}
/**
@ -279,7 +269,7 @@ class Datastore {
* @return {undefined}
*/
publish(channel, message, cb) {
return this._client.call((backend, done) => backend.publish(channel, message, done), cb);
return this._client.publish(channel, message, cb);
}
/**
@ -290,7 +280,7 @@ class Datastore {
* @return {undefined}
*/
scan(cursor, pattern, cb) {
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb);
return this._client.scan(cursor, 'match', pattern, cb);
}
}

View File

@ -6,6 +6,8 @@ const async = require('async');
const { errors } = require('arsenal');
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
const s3metricResponseJSON = require('../models/s3metricResponse');
const config = require('./Config');
const Vault = require('./Vault');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
@ -21,6 +23,7 @@ class ListMetrics {
constructor(metric, component) {
this.metric = metric;
this.service = component;
this.vault = new Vault(config);
}
/**
@ -80,10 +83,9 @@ class ListMetrics {
const resources = validator.get(this.metric);
const timeRange = validator.get('timeRange');
const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
@ -122,28 +124,7 @@ class ListMetrics {
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
const timeRange = [start - fifteenMinutes, end];
const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
return async.mapLimit(list.message.body, 5,
(item, next) => this.getMetrics(item.canonicalId, timeRange,
datastore, log, (err, res) => {
if (err) {
return next(err);
}
return next(null, Object.assign({}, res,
{ accountId: item.accountId }));
}),
cb);
});
}
return async.mapLimit(resources, 5, (resource, next) => this.getMetrics(resource, timeRange, datastore, log,
async.mapLimit(resources, 5, (resource, next) => this.getMetrics(resource, timeRange, datastore, log,
next), cb);
}
@ -312,10 +293,11 @@ class ListMetrics {
});
if (!areMetricsPositive) {
log.info('negative metric value found', {
error: resource,
method: 'ListMetrics.getMetrics',
});
return cb(errors.InternalError.customizeDescription(
'Utapi is in a transient state for this time period as '
+ 'metrics are being collected. Please try again in a few '
+ 'minutes.',
));
}
/**
* Batch result is of the format

View File

@ -7,7 +7,7 @@ const werelogs = require('werelogs');
const { errors } = require('arsenal');
const Datastore = require('./Datastore');
const { generateKey, generateCounter, generateStateKey } = require('./schema');
const redisClientv2 = require('../utils/redisClientv2');
const redisClient = require('../utils/redisClient');
const member = require('../utils/member');
const methods = {
@ -63,6 +63,7 @@ const methods = {
getObjectTagging: { method: '_genericPushMetric', changesData: false },
putObject: { method: '_genericPushMetricPutObject', changesData: true },
copyObject: { method: '_genericPushMetricPutObject', changesData: true },
putData: { method: '_genericPushMetricPutObject', changesData: true },
putObjectAcl: { method: '_genericPushMetric', changesData: true },
putObjectLegalHold: { method: '_genericPushMetric', changesData: true },
putObjectRetention: { method: '_genericPushMetric', changesData: true },
@ -90,9 +91,6 @@ const methods = {
},
putBucketObjectLock: { method: '_genericPushMetric', changesData: true },
getBucketObjectLock: { method: '_genericPushMetric', changesData: true },
replicateObject: { method: '_genericPushMetricPutObject', changesData: true },
replicateTags: { method: '_genericPushMetric', changesData: true },
replicateDelete: { method: '_pushMetricDeleteMarkerObject', changesData: true },
};
const metricObj = {
@ -129,7 +127,7 @@ class UtapiClient {
this.enabledOperationCounters = [];
this.disableClient = true;
if (config && !config.disableClient) {
if (config) {
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
@ -144,11 +142,11 @@ class UtapiClient {
}
if (config.redis) {
this.ds = new Datastore()
.setClient(redisClientv2(config.redis, this.log));
.setClient(redisClient(config.redis, this.log));
}
if (config.localCache) {
this.localCache = new Datastore()
.setClient(redisClientv2(config.localCache, this.log));
.setClient(redisClient(config.localCache, this.log));
}
if (config.component) {
// The configuration uses the property `component`, while
@ -494,7 +492,7 @@ class UtapiClient {
return done();
}),
// if cursor is 0, it reached end of scan
cb => cb(null, cursor === '0'),
() => cursor === '0',
err => callback(err, keys),
);
}
@ -544,13 +542,10 @@ class UtapiClient {
const paramsArr = this._getParamsArr(params);
paramsArr.forEach(p => {
cmds.push(['incr', generateCounter(p, 'numberOfObjectsCounter')]);
const counterAction = action === 'putDeleteMarkerObject' ? 'deleteObject' : action;
if (this._isCounterEnabled(counterAction)) {
cmds.push(['incr', generateKey(p, counterAction, timestamp)]);
if (this._isCounterEnabled('deleteObject')) {
cmds.push(['incr', generateKey(p, 'deleteObject', timestamp)]);
}
cmds.push(['zrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp]);
});
return this.ds.batch(cmds, (err, results) => {
if (err) {
log.error('error pushing metric', {
@ -584,48 +579,13 @@ class UtapiClient {
// empty.
actionCounter = Number.isNaN(actionCounter)
|| actionCounter < 0 ? 1 : actionCounter;
if (Number.isInteger(params.byteLength)) {
/* byteLength is passed in from cloudserver under the follow conditions:
* - bucket versioning is suspended
* - object version id is null
* - the content length of the object exists
* In this case, the master key is deleted and replaced with a delete marker.
* The decrement accounts for the deletion of the master key when utapi reports
* on the number of objects.
*/
actionCounter -= 1;
}
const key = generateStateKey(p, 'numberOfObjects');
const byteArr = results[index + commandsGroupSize - 1][1];
const oldByteLength = byteArr ? parseInt(byteArr[0], 10) : 0;
const newByteLength = member.serialize(Math.max(0, oldByteLength - params.byteLength));
cmds2.push(
['zremrangebyscore', key, timestamp, timestamp],
['zadd', key, timestamp, member.serialize(actionCounter)],
);
if (Number.isInteger(params.byteLength)) {
cmds2.push(
['decr', generateCounter(p, 'numberOfObjectsCounter')],
['decrby', generateCounter(p, 'storageUtilizedCounter'), params.byteLength],
);
}
if (byteArr) {
cmds2.push(
['zremrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp],
['zadd', generateStateKey(p, 'storageUtilized'), timestamp, newByteLength],
);
}
return true;
});
if (noErr) {
return this.ds.batch(cmds2, cb);
}
@ -1067,10 +1027,10 @@ class UtapiClient {
storageUtilizedDelta],
[redisCmd, generateCounter(p, 'numberOfObjectsCounter')],
);
if (this._isCounterEnabled(action)) {
if (action !== 'putData' && this._isCounterEnabled(action)) {
cmds.push(['incr', generateKey(p, action, timestamp)]);
}
if (action === 'putObject' || action === 'replicateObject') {
if (action === 'putObject' || action === 'putData') {
cmds.push(
['incrby', generateKey(p, 'incomingBytes', timestamp),
newByteLength],

View File

@ -7,28 +7,21 @@ const { jsutil } = require('arsenal');
const werelogs = require('werelogs');
const Datastore = require('./Datastore');
const RedisClient = require('../libV2/redis');
const redisClient = require('../utils/redisClient');
const REINDEX_SCHEDULE = '0 0 * * Sun';
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
const REINDEX_LOCK_TTL = (60 * 60) * 24;
const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== undefined
? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex {
constructor(config) {
this._enabled = false;
this._schedule = REINDEX_SCHEDULE;
this._redis = {
this._sentinel = {
host: '127.0.0.1',
port: 16379,
name: 'scality-s3',
sentinelPassword: '',
sentinels: [{
host: '127.0.0.1',
port: 16379,
}],
};
this._bucketd = {
host: '127.0.0.1',
@ -46,13 +39,14 @@ class UtapiReindex {
if (config && config.password) {
this._password = config.password;
}
if (config && config.redis) {
if (config && config.sentinel) {
const {
name, sentinelPassword, sentinels,
} = config.redis;
this._redis.name = name || this._redis.name;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
this._redis.sentinels = sentinels || this._redis.sentinels;
host, port, name, sentinelPassword,
} = config.sentinel;
this._sentinel.host = host || this._sentinel.host;
this._sentinel.port = port || this._sentinel.port;
this._sentinel.name = name || this._sentinel.name;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
}
if (config && config.bucketd) {
const { host, port } = config.bucketd;
@ -64,20 +58,19 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}
this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);
this._requestLogger = this._log.newRequestLogger();
}
_getRedisClient() {
const client = new RedisClient({
sentinels: this._redis.sentinels,
name: this._redis.name,
sentinelPassword: this._redis.sentinelPassword,
return redisClient({
sentinels: [{
host: this._sentinel.host,
port: this._sentinel.port,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password,
});
client.connect();
return client;
}, this._log);
}
_lock() {
@ -88,18 +81,17 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY);
}
_buildFlags(sentinel) {
_buildFlags() {
const flags = {
/* eslint-disable camelcase */
sentinel_ip: sentinel.host,
sentinel_port: sentinel.port,
sentinel_cluster_name: this._redis.name,
sentinel_ip: this._sentinel.host,
sentinel_port: this._sentinel.port,
sentinel_cluster_name: this._sentinel.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
};
if (this._redis.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword;
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
}
/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
@ -108,17 +100,17 @@ class UtapiReindex {
opts.push(name);
opts.push(flags[flag]);
});
if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts;
}
_runScriptWithSentinels(path, remainingSentinels, done) {
const flags = this._buildFlags(remainingSentinels.shift());
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
_runScript(path, done) {
const flags = this._buildFlags();
this._requestLogger.debug(`launching subprocess ${path} `
+ `with flags: ${flags}`);
const process = childProcess.spawn('python3.4', [
path,
...flags,
]);
process.stdout.on('data', data => {
this._requestLogger.info('received output from script', {
output: Buffer.from(data).toString(),
@ -143,17 +135,6 @@ class UtapiReindex {
statusCode: code,
script: path,
});
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else {
this._requestLogger.info('script exited successfully', {
statusCode: code,
@ -164,11 +145,6 @@ class UtapiReindex {
});
}
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock()

View File

@ -5,8 +5,8 @@ const { scheduleJob } = require('node-schedule');
const werelogs = require('werelogs');
const UtapiClient = require('./UtapiClient');
const Datastore = require('./Datastore');
const redisClient = require('../utils/redisClient');
const safeJsonParse = require('../utils/safeJsonParse');
const redisClientv2 = require('../utils/redisClientv2');
// Every five minutes. Cron-style scheduling used by node-schedule.
const REPLAY_SCHEDULE = '*/5 * * * *';
@ -42,7 +42,7 @@ class UtapiReplay {
assert(config.localCache, `${message}: localCache`);
this.utapiClient = new UtapiClient(config);
this.localCache = new Datastore()
.setClient(redisClientv2(config.localCache, this.log));
.setClient(redisClient(config.localCache, this.log));
if (config.replaySchedule) {
this.replaySchedule = config.replaySchedule;
}

View File

@ -14,15 +14,6 @@ class UtapiRequest {
this._datastore = null;
this._requestQuery = null;
this._requestPath = null;
this._vault = null;
}
getVault() {
return this._vault;
}
setVault() {
return this._vault;
}
/**

View File

@ -53,16 +53,6 @@ class Memory {
this.data = {};
}
/**
* A simple wrapper provided for API compatibility with redis
* @param {Function} func - Function to call
* @param {callback} cb - callback
* @returns {undefined}
*/
call(func, cb) {
return func(this, cb);
}
/**
* Set key to hold a value
* @param {string} key - data key

View File

@ -1,21 +1,16 @@
import argparse
import ast
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests
import redis
import json
import ast
import sys
from threading import Thread
import time
import urllib
import re
import sys
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex:reporting')
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
import argparse
def get_options():
parser = argparse.ArgumentParser()
@ -34,19 +29,8 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password
r = redis.Redis(
host=ip,
port=port,
db=0,
password=password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
r = redis.Redis(host=ip, port=port, db=0, password=password)
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
@ -114,4 +98,4 @@ if __name__ == '__main__':
data = U.read('accounts', userid)
content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
userid, data["files"], data["total_size"])
executor.submit(safe_print, content)
executor.submit(safe_print, content)

View File

@ -1,7 +1,4 @@
import argparse
import concurrent.futures as futures
import functools
import itertools
import json
import logging
import os
@ -9,13 +6,15 @@ import re
import sys
import time
import urllib
from pathlib import Path
from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures as futures
from pprint import pprint
import redis
import requests
from requests import ConnectionError, HTTPError, Timeout
import itertools
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex')
@ -25,9 +24,6 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -35,41 +31,11 @@ def get_options():
parser.add_argument("-v", "--redis-password", default=None, help="Redis AUTH Password")
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
parser.add_argument("-w", "--worker", default=10, help="Number of workers")
parser.add_argument("-b", "--bucket", default=False, help="Bucket to be processed")
return parser.parse_args()
options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]
return options
def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path
def chunks(iterable, size):
def chunks(iterable,size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))
while chunk:
@ -82,38 +48,22 @@ def _encoded(func):
return urllib.parse.quote(val.encode('utf-8'))
return inner
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'])
Bucket = namedtuple('Bucket', ['userid', 'name'])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])
class MaxRetriesReached(Exception):
def __init__(self, url):
super().__init__('Max retries reached for request to %s'%url)
class InvalidListing(Exception):
def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket)
class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)
class BucketDClient:
'''Performs Listing calls against bucketd'''
__url_attribute_format = '{addr}/default/attributes/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__url_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
def __init__(self, bucketd_addr=None):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()
def _do_req(self, url, check_500=True, **kwargs):
# Add 1 for the initial request
for x in range(self._max_retries + 1):
while True:
try:
resp = self._session.get(url, timeout=30, verify=False, headers=self.__headers, **kwargs)
if check_500 and resp.status_code == 500:
@ -125,9 +75,7 @@ class BucketDClient:
_log.exception(e)
_log.error('Error during listing, sleeping 5 secs %s'%url)
time.sleep(5)
raise MaxRetriesReached(url)
def _list_bucket(self, bucket, **kwargs):
'''
Lists a bucket lazily until "empty"
@ -140,7 +88,7 @@ class BucketDClient:
parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded.
'''
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop
@ -153,9 +101,6 @@ class BucketDClient:
_log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200:
payload = resp.json()
except ValueError as e:
@ -163,9 +108,6 @@ class BucketDClient:
_log.error('Invalid listing response body! bucket:%s params:%s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
continue
except MaxRetriesReached:
_log.error('Max retries reached listing bucket:%s'%bucket)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception during listing! bucket:%s params:%s'%(
@ -174,40 +116,10 @@ class BucketDClient:
yield resp.status_code, payload
if isinstance(payload, dict):
is_truncated = payload.get('IsTruncated', False)
else:
else:
is_truncated = len(payload) > 0
@functools.lru_cache(maxsize=16)
def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise BucketNotFound(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
raise
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise
def get_bucket_md(self, name):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))
def list_buckets(self, account=None):
def list_buckets(self):
def get_next_marker(p):
if p is None:
@ -219,24 +131,13 @@ class BucketDClient:
'maxKeys': 1000,
'marker': get_next_marker
}
if account is not None:
params['prefix'] = '%s..|..' % account
for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload.get('Contents', []):
for result in payload['Contents']:
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False)
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)
buckets.append(Bucket(*match.groups()))
yield buckets
if buckets:
yield buckets
def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
@ -260,7 +161,7 @@ class BucketDClient:
'splitter': '..|..',
'prefix': get_next_marker,
'uploadIdMarker': get_next_upload_id,
}
}
keys = []
for status_code, payload in self._list_bucket(_bucket, **params):
@ -273,12 +174,15 @@ class BucketDClient:
upload_id=key['value']['UploadId']))
return keys
def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
def _sum_objects(self, listing):
count = 0
total_size = 0
last_key = None
try:
for obj in listing:
last_master = None
last_size = None
for _, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
for obj in contents:
count += 1
if isinstance(obj['value'], dict):
# bucketd v6 returns a dict:
data = obj.get('value', {})
@ -286,52 +190,40 @@ class BucketDClient:
else:
# bucketd v7 returns an encoded string
data = json.loads(obj['value'])
size = data.get('content-length', 0)
is_latest = obj['key'] != last_key
last_key = obj['key']
if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue
count += 1
size = data["content-length"]
total_size += size
except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
return count, total_size
# If versioned, subtract the size of the master to avoid double counting
if last_master is not None and obj['key'].startswith(last_master + '\x00'):
_log.info('Detected versioned key: %s - subtracting master size: %i'% (
obj['key'],
last_size,
))
total_size -= last_size
count -= 1
last_master = None
# Only save master versions
elif '\x00' not in obj['key']:
last_master = obj['key']
last_size = size
def _extract_listing(self, key, listing):
for status_code, payload in listing:
contents = payload[key] if isinstance(payload, dict) else payload
if contents is None:
raise InvalidListing('')
for obj in contents:
yield obj
return count, total_size
def count_bucket_contents(self, bucket):
def get_key_marker(p):
if p is None:
def get_next_marker(p):
if p is None or len(p) == 0:
return ''
return p.get('NextKeyMarker', '')
def get_vid_marker(p):
if p is None:
return ''
return p.get('NextVersionIdMarker', '')
return p[-1].get('key', '')
params = {
'listingType': 'DelimiterVersions',
'listingType': 'Basic',
'maxKeys': 1000,
'keyMarker': get_key_marker,
'versionIdMarker': get_vid_marker,
'gt': get_next_marker,
}
listing = self._list_bucket(bucket.name, **params)
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
count, total_size = self._sum_objects(self._list_bucket(bucket.name, **params))
return BucketContents(
bucket=bucket,
obj_count=count,
@ -339,15 +231,14 @@ class BucketDClient:
)
def count_mpu_parts(self, mpu):
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
def get_prefix(p):
if p is None:
return mpu.upload_id
return p.get('Contents', [{}])[-1].get('key', '')
@_encoded
@_encoded
def get_next_marker(p):
prefix = get_prefix(p)
return prefix + '..|..00000'
@ -360,53 +251,38 @@ class BucketDClient:
'listingType': 'Delimiter',
}
listing = self._list_bucket(shadow_bucket_name, **params)
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
count, total_size = self._sum_objects(self._list_bucket(_bucket, **params))
return BucketContents(
bucket=shadow_bucket,
obj_count=0, # MPU parts are not counted towards numberOfObjects
bucket=mpu.bucket._replace(name=_bucket),
obj_count=count,
total_size=total_size
)
def list_all_buckets(bucket_client):
return bucket_client.list_buckets()
def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)
def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue
yield batch
def index_bucket(client, bucket):
'''
Takes an instance of BucketDClient and a bucket name, and returns a
tuple of BucketContents for the passed bucket and its mpu shadow bucket.
'''
try:
bucket_total = client.count_bucket_contents(bucket)
mpus = client.list_mpus(bucket)
if not mpus:
return bucket_total
total_size = bucket_total.total_size
bucket_total = client.count_bucket_contents(bucket)
mpus = client.list_mpus(bucket)
shadowbucket = bucket._replace(name=MPU_SHADOW_BUCKET_PREFIX + bucket.name)
if not mpus:
mpu_total = BucketContents(shadowbucket, 0, 0)
else:
mpu_totals = [client.count_mpu_parts(m) for m in mpus]
mpu_part_count = 0
mpu_total_size = 0
for mpu in mpu_totals:
total_size += mpu.total_size
mpu_part_count += mpu.obj_count
mpu_total_size += mpu.total_size
mpu_total = BucketContents(
shadowbucket,
mpu_part_count,
mpu_total_size
)
return bucket_total, mpu_total
return bucket_total._replace(total_size=total_size)
except Exception as e:
_log.exception(e)
_log.error('Error during listing. Removing from results bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
def update_report(report, key, obj_count, total_size):
'''Convenience function to update the report dicts'''
@ -418,22 +294,15 @@ def update_report(report, key, obj_count, total_size):
'obj_count': obj_count,
'total_size': total_size,
}
def get_redis_client(options):
sentinel = redis.Redis(
host=options.sentinel_ip,
port=options.sentinel_port,
db=0,
password=options.redis_password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
password=options.redis_password
)
try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
return redis.Redis(
host=ip,
port=port,
@ -445,7 +314,7 @@ def update_redis(client, resource, name, obj_count, total_size):
timestamp = int(time.time() - 15 * 60) * 1000
obj_count_key = 's3:%s:%s:numberOfObjects' % (resource, name)
total_size_key = 's3:%s:%s:storageUtilized' % (resource, name)
client.zremrangebyscore(obj_count_key, timestamp, timestamp)
client.zremrangebyscore(total_size_key, timestamp, timestamp)
client.zadd(obj_count_key, {obj_count: timestamp})
@ -453,10 +322,6 @@ def update_redis(client, resource, name, obj_count, total_size):
client.set(obj_count_key + ':counter', obj_count)
client.set(total_size_key + ':counter', total_size)
def get_resources_from_redis(client, resource):
for key in redis_client.scan_iter('s3:%s:*:storageUtilized' % resource):
yield key.decode('utf-8').split(':')[2]
def log_report(resource, name, obj_count, total_size):
print('%s:%s:%s:%s'%(
resource,
@ -467,120 +332,30 @@ def log_report(resource, name, obj_count, total_size):
if __name__ == '__main__':
options = get_options()
if options.debug:
_log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
bucket_client = BucketDClient(options.bucketd_addr)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
failed_accounts = set()
if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)
with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in batch_generator:
bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
for job in futures.as_completed(jobs.keys()):
try:
total = job.result() # Summed bucket and shadowbucket totals
except InvalidListing:
_bucket = jobs[job]
_log.error('Failed to list bucket %s. Removing from results.'%_bucket.name)
# Add the bucket to observed_buckets anyway to avoid clearing existing metrics
observed_buckets.add(_bucket.name)
# If we can not list one of an account's buckets we can not update its total
failed_accounts.add(_bucket.userid)
continue
observed_buckets.add(total.bucket.name)
update_report(bucket_reports, total.bucket.name, total.obj_count, total.total_size)
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
for batch in bucket_client.list_buckets():
bucket_reports = {}
jobs = [executor.submit(index_bucket, bucket_client, b) for b in batch]
for job in futures.as_completed(jobs):
totals = job.result() # bucket and shadowbucket totals as tuple
for total in totals:
update_report(bucket_reports, total.bucket.name, total.obj_count, total.total_size)
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
# Bucket reports can be updated as we get them
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
stale_buckets = recorded_buckets.difference(observed_buckets)
_log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
else:
# Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run:
for userid, report in account_reports.items():
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
else:
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
if options.account:
stale_accounts = { a for a in options.account if a not in observed_accounts }
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
_log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run:
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()
# Update total account reports in chunks
for chunk in chunks(account_reports.items(), ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()

View File

@ -52,9 +52,6 @@ const keys = {
getObjectRetention: prefix => `${prefix}GetObjectRetention`,
putObjectLegalHold: prefix => `${prefix}PutObjectLegalHold`,
getObjectLegalHold: prefix => `${prefix}GetObjectLegalHold`,
replicateObject: prefix => `${prefix}ReplicateObject`,
replicateTags: prefix => `${prefix}ReplicateTags`,
replicateDelete: prefix => `${prefix}ReplicateDelete`,
incomingBytes: prefix => `${prefix}incomingBytes`,
outgoingBytes: prefix => `${prefix}outgoingBytes`,
};

View File

@ -7,12 +7,13 @@ const { Clustering, errors, ipCheck } = require('arsenal');
const arsenalHttps = require('arsenal').https;
const { Logger } = require('werelogs');
const config = require('./Config');
const routes = require('../router/routes');
const Route = require('../router/Route');
const Router = require('../router/Router');
const UtapiRequest = require('../lib/UtapiRequest');
const Datastore = require('./Datastore');
const redisClientv2 = require('../utils/redisClientv2');
const redisClient = require('../utils/redisClient');
class UtapiServer {
/**
@ -27,12 +28,7 @@ class UtapiServer {
constructor(worker, port, datastore, logger, config) {
this.worker = worker;
this.port = port;
this.vault = config.vaultclient;
if (!this.vault) {
const Vault = require('./Vault');
this.vault = new Vault(config);
}
this.router = new Router(config, this.vault);
this.router = new Router(config);
this.logger = logger;
this.datastore = datastore;
this.server = null;
@ -75,7 +71,6 @@ class UtapiServer {
req.socket.setNoDelay();
const { query, path, pathname } = url.parse(req.url, true);
const utapiRequest = new UtapiRequest()
.setVault(this.vault)
.setRequest(req)
.setLog(this.logger.newRequestLogger())
.setResponse(res)
@ -98,7 +93,7 @@ class UtapiServer {
return this.errorResponse(utapiRequest, errors.AccessDenied);
}
const redisClient = this.datastore.getClient();
if (!redisClient.isReady) {
if (redisClient.status !== 'ready') {
return this.errorResponse(utapiRequest,
errors.InternalError.customizeDescription(
'Redis server is not ready',
@ -219,7 +214,8 @@ class UtapiServer {
* @property {object} params.log - logger configuration
* @return {undefined}
*/
function spawn(config) {
function spawn(params) {
Object.assign(config, params);
const {
workers, redis, log, port,
} = config;
@ -230,7 +226,7 @@ function spawn(config) {
});
const cluster = new Clustering(workers, logger);
cluster.start(worker => {
const datastore = new Datastore().setClient(redisClientv2(redis, logger));
const datastore = new Datastore().setClient(redisClient(redis, logger));
const server = new UtapiServer(worker, port, datastore, logger, config);
server.startup();
});

View File

@ -1,23 +1,10 @@
const schema = require('../schema');
const constants = require('../../constants');
/**
* Returns null iff the value is undefined.
* Returns the passed value otherwise.
*
* @param {*} value - Any value
* @returns {*} - Passed value or null
*/
function orNull(value) {
return value === undefined ? null : value;
}
class MemoryCache {
constructor() {
this._data = {};
this._shards = {};
this._prefix = 'utapi';
this._expirations = {};
}
// eslint-disable-next-line class-methods-use-this
@ -27,17 +14,9 @@ class MemoryCache {
// eslint-disable-next-line class-methods-use-this
async disconnect() {
Object.values(this._expirations).forEach(clearTimeout);
return true;
}
_expireKey(key, delay) {
if (this._expirations[key]) {
clearTimeout(this._expirations[key]);
}
this._expirations[key] = setTimeout(() => delete this._data[key], delay * 1000);
}
async getKey(key) {
return this._data[key];
}
@ -84,27 +63,6 @@ class MemoryCache {
async shardExists(shard) {
return this._shards[shard.toString()] !== undefined;
}
async updateCounters(metric) {
if (metric.sizeDelta) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, metric.account);
this._data[accountSizeKey] = (this._data[accountSizeKey] || 0) + metric.sizeDelta;
}
}
async updateAccountCounterBase(account, size) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
this._data[accountSizeKey] = 0;
this._data[accountSizeBaseKey] = size;
this._expireKey(accountSizeBaseKey, constants.counterBaseValueExpiration);
}
async fetchAccountSizeCounter(account) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
return [orNull(this._data[accountSizeKey]), orNull(this._data[accountSizeBaseKey])];
}
}
module.exports = MemoryCache;

View File

@ -1,8 +1,7 @@
const RedisClient = require('../../redis');
const IORedis = require('ioredis');
const schema = require('../schema');
const { LoggerContext } = require('../../utils');
const constants = require('../../constants');
const moduleLogger = new LoggerContext({
module: 'cache.backend.redis.RedisCache',
@ -17,8 +16,11 @@ class RedisCache {
async connect() {
moduleLogger.debug('Connecting to redis...');
this._redis = new RedisClient(this._options);
this._redis.connect();
this._redis = new IORedis(this._options);
this._redis
.on('error', err =>
moduleLogger.error(`error connecting to redis ${err}`))
.on('connect', () => moduleLogger.debug('connected to redis'));
return true;
}
@ -27,7 +29,7 @@ class RedisCache {
if (this._redis) {
try {
logger.debug('closing connection to redis');
await this._redis.disconnect();
await this._redis.quit();
} catch (error) {
logger.error('error while closing connection to redis', {
error,
@ -41,153 +43,138 @@ class RedisCache {
}
async getKey(key) {
return moduleLogger
.with({ method: 'getKey' })
.logAsyncError(() => this._redis.call(redis => redis.get(key)),
'error fetching key from redis', { key });
try {
return this._redis.get(key);
} catch (error) {
moduleLogger
.with({ method: 'getKey' })
.error('error fetching key from redis', { key });
throw error;
}
}
async setKey(key, value) {
return moduleLogger
.with({ method: 'setKey' })
.logAsyncError(async () => {
const res = await this._redis.call(redis => redis.set(key, value));
return res === 'OK';
}, 'error setting key in redis', { key });
try {
const res = await this._redis.set(key, value);
return res === 'OK';
} catch (error) {
moduleLogger
.with({ method: 'setKey' })
.error('error setting key in redis', { key });
throw error;
}
}
async addToShard(shard, metric) {
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
const logger = moduleLogger.with({ method: 'addToShard' });
return logger
.logAsyncError(async () => {
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
logger.debug('adding metric to shard', { metricKey, shardKey });
logger.debug('adding metric to shard', { metricKey, shardKey });
const [setResults, saddResults] = await this._redis
.call(redis => redis
.multi([
['set', metricKey, JSON.stringify(metric.getValue())],
['sadd', shardKey, metricKey],
['sadd', shardMasterKey, shardKey],
])
.exec());
let setResults;
let saddResults;
try {
[setResults, saddResults] = await this._redis
.multi([
['set', metricKey, JSON.stringify(metric.getValue())],
['sadd', shardKey, metricKey],
['sadd', shardMasterKey, shardKey],
])
.exec();
} catch (error) {
logger.error('error during redis command', { error });
throw error;
}
let success = true;
if (setResults[1] !== 'OK') {
moduleLogger.error('failed to set metric key', {
metricKey,
shardKey,
res: setResults[1],
});
success = false;
}
let success = true;
if (setResults[1] !== 'OK') {
moduleLogger.error('failed to set metric key', {
metricKey,
shardKey,
res: setResults[1],
});
success = false;
}
if (saddResults[1] !== 1) {
moduleLogger.error('metric key already present in shard', {
metricKey,
shardKey,
res: saddResults[1],
});
success = false;
}
return success;
}, 'error during redis command');
if (saddResults[1] !== 1) {
moduleLogger.error('metric key already present in shard', {
metricKey,
shardKey,
res: saddResults[1],
});
success = false;
}
return success;
}
async getKeysInShard(shard) {
return moduleLogger
.with({ method: 'getKeysInShard' })
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
return this._redis.call(redis => redis.smembers(shardKey));
}, 'error while fetching shard keys', { shard });
try {
const shardKey = schema.getShardKey(this._prefix, shard);
return this._redis.smembers(shardKey);
} catch (error) {
moduleLogger
.with({ method: 'getKeysInShard' })
.error('error while fetching shard keys', { shard, error });
throw error;
}
}
async fetchShard(shard) {
return moduleLogger
.with({ method: 'fetchShard' })
.logAsyncError(async () => {
const keys = await this.getKeysInShard(shard);
if (!keys.length) {
return [];
}
return this._redis.call(redis => redis.mget(...keys));
}, 'error while fetching shard data', { shard });
try {
const keys = await this.getKeysInShard(shard);
if (!keys.length) {
return [];
}
return this._redis.mget(...keys);
} catch (error) {
moduleLogger
.with({ method: 'fetchShard' })
.error('error while fetching shard data', { shard, error });
throw error;
}
}
async deleteShardAndKeys(shard) {
return moduleLogger
.with({ method: 'deleteShardAndKeys' })
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
const keys = await this.getKeysInShard(shard);
return this._redis.call(
redis => redis.multi([
['del', shardKey, ...keys],
['srem', shardMasterKey, shardKey],
]).exec(),
);
}, 'error while deleting shard', { shard });
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
try {
const keys = await this.getKeysInShard(shard);
return this._redis.multi([
['del', shardKey, ...keys],
['srem', shardMasterKey, shardKey],
]).exec();
} catch (error) {
moduleLogger
.with({ method: 'deleteShardAndKeys' })
.error('error while deleting shard', { shard, error });
throw error;
}
}
async shardExists(shard) {
return moduleLogger
.with({ method: 'shardExists' })
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
const res = await this._redis.call(redis => redis.exists(shardKey));
return res === 1;
}, 'error while checking shard', { shard });
const shardKey = schema.getShardKey(this._prefix, shard);
try {
const res = await this._redis.exists(shardKey);
return res === 1;
} catch (error) {
moduleLogger
.with({ method: 'shardExists' })
.error('error while checking shard', { shard, error });
throw error;
}
}
async getShards() {
return moduleLogger
.with({ method: 'getShards' })
.logAsyncError(async () => {
const shardMasterKey = schema.getShardMasterKey(this._prefix);
return this._redis.call(redis => redis.smembers(shardMasterKey));
}, 'error while fetching shards');
}
async updateCounters(metric) {
return moduleLogger
.with({ method: 'updateCounter' })
.logAsyncError(async () => {
if (metric.sizeDelta) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, metric.account);
await this._redis.call(redis => redis.incrby(accountSizeKey, metric.sizeDelta));
}
}, 'error while updating metric counters');
}
async updateAccountCounterBase(account, size) {
return moduleLogger
.with({ method: 'updateAccountCounterBase' })
.logAsyncError(async () => {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
await this._redis.call(async redis => {
await redis.mset(accountSizeKey, 0, accountSizeBaseKey, size);
await redis.expire(accountSizeBaseKey, constants.counterBaseValueExpiration);
});
}, 'error while updating metric counter base');
}
async fetchAccountSizeCounter(account) {
return moduleLogger
.with({ method: 'fetchAccountSizeCounter' })
.logAsyncError(async () => {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
const [counter, base] = await this._redis.call(redis => redis.mget(accountSizeKey, accountSizeBaseKey));
return [
counter !== null ? parseInt(counter, 10) : null,
base !== null ? parseInt(base, 10) : null,
];
}, 'error fetching account size counters', { account });
try {
const shardMasterKey = schema.getShardMasterKey(this._prefix);
return this._redis.smembers(shardMasterKey);
} catch (error) {
moduleLogger
.with({ method: 'getShards' })
.error('error while fetching shards', { error });
throw error;
}
}
}

35
libV2/cache/client.js vendored
View File

@ -3,55 +3,36 @@ const { shardFromTimestamp } = require('../utils');
class CacheClient {
constructor(config) {
this._prefix = config.prefix || 'utapi';
this._cacheBackend = config.cacheBackend;
this._counterBackend = config.counterBackend;
this._backend = config.backend;
}
async connect() {
return Promise.all([
this._cacheBackend.connect(),
this._counterBackend.connect(),
]);
return this._backend.connect();
}
async disconnect() {
return Promise.all([
this._cacheBackend.disconnect(),
this._counterBackend.disconnect(),
]);
return this._backend.disconnect();
}
async pushMetric(metric) {
const shard = shardFromTimestamp(metric.timestamp);
if (!(await this._cacheBackend.addToShard(shard, metric))) {
return false;
}
await this._counterBackend.updateCounters(metric);
return true;
return this._backend.addToShard(shard, metric);
}
async getMetricsForShard(shard) {
return this._cacheBackend.fetchShard(shard);
return this._backend.fetchShard(shard);
}
async deleteShard(shard) {
return this._cacheBackend.deleteShardAndKeys(shard);
return this._backend.deleteShardAndKeys(shard);
}
async shardExists(shard) {
return this._cacheBackend.shardExists(shard);
return this._backend.shardExists(shard);
}
async getShards() {
return this._cacheBackend.getShards();
}
async updateAccountCounterBase(account, size) {
return this._counterBackend.updateAccountCounterBase(account, size);
}
async fetchAccountSizeCounter(account) {
return this._counterBackend.fetchAccountSizeCounter(account);
return this._backend.getShards();
}
}

View File

@ -4,12 +4,11 @@ const CacheClient = require('./client');
const { MemoryCache, RedisCache } = require('./backend');
const cacheTypes = {
redis: conf => new RedisCache(conf),
redis: () => new RedisCache(config.cache),
memory: () => new MemoryCache(),
};
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
const counterBackend = cacheTypes[config.cache.backend](config.redis);
const backend = cacheTypes[config.cache.backend]();
module.exports = {
CacheClient,
@ -17,5 +16,5 @@ module.exports = {
MemoryCache,
RedisCache,
},
client: new CacheClient({ cacheBackend, counterBackend }),
client: new CacheClient({ backend }),
};

10
libV2/cache/schema.js vendored
View File

@ -10,18 +10,8 @@ function getShardMasterKey(prefix) {
return `${prefix}:shard:master`;
}
function getAccountSizeCounterKey(prefix, account) {
return `${prefix}:counters:account:${account}:size`;
}
function getAccountSizeCounterBaseKey(prefix, account) {
return `${prefix}:counters:account:${account}:size:base`;
}
module.exports = {
getShardKey,
getUtapiMetricKey,
getShardMasterKey,
getAccountSizeCounterKey,
getAccountSizeCounterBaseKey,
};

View File

@ -8,15 +8,9 @@ const needle = require('needle');
const levelup = require('levelup');
const memdown = require('memdown');
const encode = require('encoding-down');
/* eslint-enable import/no-extraneous-dependencies */
const { UtapiMetric } = require('../models');
const {
LoggerContext,
logEventFilter,
asyncOrCallback,
buildFilterChain,
} = require('../utils');
const { LoggerContext } = require('../utils');
/* eslint-enable import/no-extraneous-dependencies */
const moduleLogger = new LoggerContext({
module: 'client',
@ -76,24 +70,13 @@ class UtapiClient {
constructor(config) {
this._host = (config && config.host) || 'localhost';
this._port = (config && config.port) || '8100';
this._tls = (config && config.tls) || {};
this._transport = (config && config.tls) ? 'https' : 'http';
this._logger = (config && config.logger) || moduleLogger;
this._maxCachedMetrics = (config && config.maxCachedMetrics) || 200000; // roughly 100MB
this._numCachedMetrics = 0;
this._disableRetryCache = config && config.disableRetryCache;
this._retryCache = this._disableRetryCache
? null
: levelup(encode(memdown(), { valueEncoding: 'json' }));
this._retryCache = levelup(encode(memdown(), { valueEncoding: 'json' }));
this._drainTimer = null;
this._drainCanSchedule = true;
this._drainDelay = (config && config.drainDelay) || 30000;
this._suppressedEventFields = (config && config.suppressedEventFields) || null;
const eventFilters = (config && config.filter) || {};
this._shouldPushMetric = buildFilterChain(eventFilters);
if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => moduleLogger.info(...args), 'utapi event filter enabled', eventFilters);
}
}
async join() {
@ -104,9 +87,9 @@ class UtapiClient {
async _pushToUtapi(metrics) {
const resp = await needle(
'post',
`${this._transport}://${this._host}:${this._port}/v2/ingest`,
`http://${this._host}:${this._port}/v2/ingest`,
metrics.map(metric => metric.getValue()),
{ json: true, ...this._tls },
{ json: true },
);
if (resp.statusCode !== 200) {
throw Error('failed to push metric, server returned non 200 status code',
@ -168,8 +151,7 @@ class UtapiClient {
try {
const resp = await needle(
'get',
`${this._transport}://${this._host}:${this._port}/_/healthcheck`,
this._tls,
`http://${this._host}:${this._port}/_/healthcheck`,
);
return resp.statusCode === 200;
} catch (error) {
@ -246,15 +228,10 @@ class UtapiClient {
}
async _pushMetric(data) {
let metric = data instanceof UtapiMetric
const metric = data instanceof UtapiMetric
? data
: new UtapiMetric(data);
// If this event has been filtered then exit early
if (!this._shouldPushMetric(metric)) {
return;
}
// Assign a uuid if one isn't passed
if (!metric.uuid) {
metric.uuid = uuid.v4();
@ -265,26 +242,12 @@ class UtapiClient {
metric.timestamp = new Date().getTime();
}
if (this._suppressedEventFields !== null) {
const filteredData = Object.entries(metric.getValue())
.filter(([key]) => !this._suppressedEventFields.includes(key))
.reduce((obj, [key, value]) => {
obj[key] = value;
return obj;
}, {});
metric = new UtapiMetric(filteredData);
}
try {
await this._pushToUtapi([metric]);
} catch (error) {
if (!this._disableRetryCache) {
this._logger.error('unable to push metric, adding to retry cache', { error });
if (!await this._addToRetryCache(metric)) {
throw new Error('unable to store metric');
}
} else {
this._logger.debug('unable to push metric. retry cache disabled, not retrying ingestion.', { error });
this._logger.error('unable to push metric, adding to retry cache', { error });
if (!await this._addToRetryCache(metric)) {
throw new Error('unable to store metric');
}
}
}
@ -296,34 +259,6 @@ class UtapiClient {
}
return this._pushMetric(data);
}
/**
* Get the storageUtilized of a resource
*
* @param {string} level - level of metrics, currently only 'accounts' is supported
* @param {string} resource - id of the resource
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - return a Promise if no callback is provided, undefined otherwise
*/
getStorage(level, resource, callback) {
if (level !== 'accounts') {
throw new Error('invalid level, only "accounts" is supported');
}
return asyncOrCallback(async () => {
const resp = await needle(
'get',
`${this._transport}://${this._host}:${this._port}/v2/storage/${level}/${resource}`,
this._tls,
);
if (resp.statusCode !== 200) {
throw new Error(`unable to retrieve metrics: ${resp.statusMessage}`);
}
return resp.body;
}, callback);
}
}
module.exports = UtapiClient;

View File

@ -5,60 +5,26 @@
"logLevel": "info",
"dumpLevel": "error"
},
"redis": {
"host": "127.0.0.1",
"port": 6379
},
"localCache": {
"host": "127.0.0.1",
"port": 6379
},
"warp10": {
"host": "127.0.0.1",
"port": 4802,
"nodeId": "single_node",
"requestTimeout": 60000,
"connectTimeout": 60000
"port": 4802
},
"healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"]
},
"vaultd": {
"host": "127.0.0.1",
"port": 8500
},
"cacheBackend": "memory",
"development": false,
"nodeId": "single_node",
"ingestionSchedule": "*/5 * * * * *",
"ingestionShardSize": 10,
"ingestionLagSeconds": 30,
"checkpointSchedule": "*/30 * * * * *",
"snapshotSchedule": "5 0 * * * *",
"repairSchedule": "0 */5 * * * *",
"reindexSchedule": "0 0 0 * * Sun",
"diskUsageSchedule": "0 */15 * * * *",
"bucketd": [ "localhost:9000" ],
"reindex": {
"enabled": true,
"schedule": "0 0 0 * * 6"
},
"diskUsage": {
"retentionDays": 45,
"expirationEnabled": false
},
"serviceUser": {
"arn": "arn:aws:iam::000000000000:user/scality-internal/service-utapi-user",
"enabled": false
},
"filter": {
"allow": {},
"deny": {}
},
"metrics" : {
"enabled": false,
"host": "localhost",
"ingestPort": 10902,
"checkpointPort": 10903,
"snapshotPort": 10904,
"diskUsagePort": 10905,
"reindexPort": 10906,
"repairPort": 10907
}
"snapshotSchedule": "* 0 * * * *",
"repairSchedule": "* */5 * * * *"
}

View File

@ -2,47 +2,27 @@ const fs = require('fs');
const path = require('path');
const Joi = require('@hapi/joi');
const assert = require('assert');
const defaults = require('./defaults.json');
const werelogs = require('werelogs');
const {
truthy, envNamespace, allowedFilterFields, allowedFilterStates,
} = require('../constants');
const { truthy, envNamespace } = require('../constants');
const configSchema = require('./schema');
// We need to require the specific file rather than the parent module to avoid a circular require
const { parseDiskSizeSpec } = require('../utils/disk');
function _splitTrim(char, text) {
return text.split(char).map(v => v.trim());
}
function _splitServer(text) {
assert.notStrictEqual(text.indexOf(':'), -1);
const [host, port] = _splitTrim(':', text);
const [host, port] = text.split(':').map(v => v.trim());
return {
host,
port: Number.parseInt(port, 10),
};
}
function _splitNode(text) {
assert.notStrictEqual(text.indexOf('='), -1);
const [nodeId, hostname] = _splitTrim('=', text);
return {
nodeId,
..._splitServer(hostname),
};
}
const _typeCasts = {
bool: val => truthy.has(val.toLowerCase()),
int: val => parseInt(val, 10),
list: val => _splitTrim(',', val),
serverList: val => _splitTrim(',', val).map(_splitServer),
nodeList: val => _splitTrim(',', val).map(_splitNode),
diskSize: parseDiskSizeSpec,
list: val => val.split(',').map(v => v.trim()),
serverList: val => val.split(',').map(v => v.trim()).map(_splitServer),
};
function _definedInEnv(key) {
return process.env[`${envNamespace}_${key}`] !== undefined;
}
@ -73,6 +53,7 @@ class Config {
constructor(overrides) {
this._basePath = path.join(__dirname, '../../');
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
this._defaultsPath = path.join(__dirname, 'defaults.json');
this.host = undefined;
this.port = undefined;
@ -85,46 +66,33 @@ class Config {
// read config automatically
const loadedConfig = this._loadConfig();
let parsedConfig = this._parseConfig(loadedConfig);
let parsedConfig = Config._parseConfig(loadedConfig);
if (typeof overrides === 'object') {
parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
}
Object.assign(this, parsedConfig);
werelogs.configure({
level: Config.logging.level,
dump: Config.logging.dumpLevel,
});
}
static _readFile(path, encoding = 'utf-8') {
try {
return fs.readFileSync(path, { encoding });
} catch (error) {
// eslint-disable-next-line no-console
console.error({ message: `error reading file at ${path}`, error });
throw error;
}
}
static _readJSON(path) {
const data = Config._readFile(path);
static _readFile(path) {
try {
const data = fs.readFileSync(path, {
encoding: 'utf-8',
});
return JSON.parse(data);
} catch (error) {
// eslint-disable-next-line no-console
console.error({ message: `error parsing JSON from file at ${path}`, error });
console.error({ message: `error reading config file at ${path}`, error });
throw error;
}
}
_loadDefaults() {
return defaults;
return Config._readFile(this._defaultsPath);
}
_loadUserConfig() {
return Joi.attempt(
Config._readJSON(this._configPath),
Config._readFile(this._configPath),
configSchema,
'invalid Utapi config',
);
@ -183,85 +151,45 @@ class Config {
return this._recursiveUpdateObject(defaultConf, userConf);
}
static _parseRedisConfig(prefix, config) {
const redisConf = {
retry: config.retry,
};
if (config.sentinels || _definedInEnv(`${prefix}_SENTINELS`)) {
redisConf.name = _loadFromEnv(`${prefix}_NAME`, config.name);
redisConf.sentinels = _loadFromEnv(
`${prefix}_SENTINELS`,
static _parseRedisConfig(config) {
const redisConf = {};
if (config.sentinels || _definedInEnv('REDIS_SENTINELS')) {
redisConf.name = _loadFromEnv('REDIS_NAME', config.name);
const sentinels = _loadFromEnv(
'REDIS_SENTINELS',
config.sentinels,
_typeCasts.serverList,
_typeCasts.list,
);
redisConf.sentinels = sentinels.map(v => {
if (typeof v === 'string') {
const [host, port] = v.split(':');
return { host, port: Number.parseInt(port, 10) };
}
return v;
});
redisConf.sentinelPassword = _loadFromEnv(
`${prefix}_SENTINEL_PASSWORD`,
'REDIS_SENTINEL_PASSWORD',
config.sentinelPassword,
);
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
config.password,
);
} else {
redisConf.host = _loadFromEnv(
`${prefix}_HOST`,
'REDIS_HOST',
config.host,
);
redisConf.port = _loadFromEnv(
`${prefix}_PORT`,
'REDIS_PORT',
config.port,
_typeCasts.int,
);
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
'REDIS_PASSWORD',
config.password,
);
}
return redisConf;
}
_loadCertificates(config) {
const { key, cert, ca } = config;
const keyPath = path.isAbsolute(key) ? key : path.join(this._basePath, key);
const certPath = path.isAbsolute(cert) ? cert : path.join(this._basePath, cert);
const certs = {
cert: Config._readFile(certPath, 'ascii'),
key: Config._readFile(keyPath, 'ascii'),
};
if (ca) {
const caPath = path.isAbsolute(ca) ? ca : path.join(this._basePath, ca);
certs.ca = Config._readFile(caPath, 'ascii');
}
return certs;
}
static _parseResourceFilters(config) {
const resourceFilters = {};
allowedFilterFields.forEach(
field => allowedFilterStates.forEach(
state => {
const configResources = (config[state] && config[state][field]) || null;
const envVar = `FILTER_${field.toUpperCase()}_${state.toUpperCase()}`;
const resources = _loadFromEnv(envVar, configResources, _typeCasts.list);
if (resources) {
if (resourceFilters[field]) {
throw new Error('You can not define both an allow and a deny list for an event field.');
}
resourceFilters[field] = { [state]: new Set(resources) };
}
},
),
);
return resourceFilters;
}
_parseConfig(config) {
static _parseConfig(config) {
const parsedConfig = {};
parsedConfig.development = _loadFromEnv('DEV_MODE', config.development, _typeCasts.bool);
@ -280,41 +208,25 @@ class Config {
allowFrom: healthCheckFromEnv.concat(config.healthChecks.allowFrom),
};
const certPaths = {
cert: _loadFromEnv('TLS_CERT', config.certFilePaths.cert),
key: _loadFromEnv('TLS_KEY', config.certFilePaths.key),
ca: _loadFromEnv('TLS_CA', config.certFilePaths.ca),
};
if (certPaths.key && certPaths.cert) {
parsedConfig.tls = this._loadCertificates(certPaths);
} else if (certPaths.key || certPaths.cert) {
throw new Error('bad config: both certFilePaths.key and certFilePaths.cert must be defined');
}
parsedConfig.redis = Config._parseRedisConfig(config.redis);
parsedConfig.redis = Config._parseRedisConfig('REDIS', config.redis);
parsedConfig.cache = Config._parseRedisConfig('REDIS_CACHE', config.localCache);
parsedConfig.cache = Config._parseRedisConfig(config.localCache);
parsedConfig.cache.backend = _loadFromEnv('CACHE_BACKEND', config.cacheBackend);
const warp10Conf = {
readToken: _loadFromEnv('WARP10_READ_TOKEN', config.warp10.readToken),
writeToken: _loadFromEnv('WARP10_WRITE_TOKEN', config.warp10.writeToken),
requestTimeout: _loadFromEnv('WARP10_REQUEST_TIMEOUT', config.warp10.requestTimeout, _typeCasts.int),
connectTimeout: _loadFromEnv('WARP10_CONNECT_TIMEOUT', config.warp10.connectTimeout, _typeCasts.int),
};
if (Array.isArray(config.warp10.hosts) || _definedInEnv('WARP10_HOSTS')) {
warp10Conf.hosts = _loadFromEnv('WARP10_HOSTS', config.warp10.hosts, _typeCasts.nodeList);
} else {
warp10Conf.hosts = [{
host: _loadFromEnv('WARP10_HOST', config.warp10.host),
port: _loadFromEnv('WARP10_PORT', config.warp10.port, _typeCasts.int),
nodeId: _loadFromEnv('WARP10_NODE_ID', config.warp10.nodeId),
}];
}
parsedConfig.warp10 = warp10Conf;
if (Array.isArray(config.warp10.hosts) || _definedInEnv('WARP10_HOSTS')) {
warp10Conf.hosts = _loadFromEnv('WARP10_HOSTS', config.warp10.hosts, _typeCasts.serverList);
} else {
warp10Conf.host = _loadFromEnv('WARP10_HOST', config.warp10.host);
warp10Conf.port = _loadFromEnv('WARP10_PORT', config.warp10.port, _typeCasts.int);
}
parsedConfig.logging = {
level: parsedConfig.development
? 'debug'
@ -325,77 +237,17 @@ class Config {
),
};
parsedConfig.ingestionSchedule = _loadFromEnv('INGESTION_SCHEDULE', config.ingestionSchedule);
parsedConfig.checkpointSchedule = _loadFromEnv('CHECKPOINT_SCHEDULE', config.checkpointSchedule);
parsedConfig.snapshotSchedule = _loadFromEnv('SNAPSHOT_SCHEDULE', config.snapshotSchedule);
parsedConfig.repairSchedule = _loadFromEnv('REPAIR_SCHEDULE', config.repairSchedule);
parsedConfig.reindexSchedule = _loadFromEnv('REINDEX_SCHEDULE', config.reindexSchedule);
parsedConfig.diskUsageSchedule = _loadFromEnv('DISK_USAGE_SCHEDULE', config.diskUsageSchedule);
parsedConfig.ingestionLagSeconds = _loadFromEnv(
'INGESTION_LAG_SECONDS',
config.ingestionLagSeconds,
_typeCasts.int,
);
parsedConfig.ingestionShardSize = _loadFromEnv(
'INGESTION_SHARD_SIZE',
config.ingestionShardSize,
_typeCasts.int,
);
const diskUsage = {
path: _loadFromEnv('DISK_USAGE_PATH', (config.diskUsage || {}).path),
hardLimit: _loadFromEnv('DISK_USAGE_HARD_LIMIT', (config.diskUsage || {}).hardLimit),
retentionDays: _loadFromEnv(
'METRIC_RETENTION_PERIOD',
(config.diskUsage || {}).retentionDays, _typeCasts.int,
),
expirationEnabled: _loadFromEnv(
'METRIC_EXPIRATION_ENABLED',
(config.diskUsage || {}).expirationEnabled, _typeCasts.bool,
),
};
if (diskUsage.hardLimit !== undefined) {
diskUsage.hardLimit = parseDiskSizeSpec(diskUsage.hardLimit);
}
if (!diskUsage.path && diskUsage.hardLimit !== undefined) {
throw Error('You must specify diskUsage.path to monitor for disk usage');
} else if (diskUsage.path && diskUsage.hardLimit === undefined) {
throw Error('diskUsage.hardLimit must be specified');
} else if (diskUsage.expirationEnabled && diskUsage.retentionDays === undefined) {
throw Error('diskUsage.retentionDays must be specified');
}
diskUsage.enabled = diskUsage.path !== undefined;
parsedConfig.diskUsage = diskUsage;
parsedConfig.vaultd = {
host: _loadFromEnv('VAULT_HOST', config.vaultd.host),
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
};
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
parsedConfig.serviceUser = {
arn: _loadFromEnv('SERVICE_USER_ARN', config.serviceUser.arn),
enabled: _loadFromEnv('SERVICE_USER_ENABLED', config.serviceUser.enabled, _typeCasts.bool),
};
parsedConfig.filter = Config._parseResourceFilters(config.filter);
parsedConfig.metrics = {
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
};
return parsedConfig;
}

View File

@ -1,23 +1,9 @@
const Joi = require('@hapi/joi');
const { allowedFilterFields, allowedFilterStates } = require('../constants');
const backoffSchema = Joi.object({
min: Joi.number(),
max: Joi.number(),
deadline: Joi.number(),
jitter: Joi.number(),
factor: Joi.number(),
});
const redisRetrySchema = Joi.object({
connectBackoff: backoffSchema,
});
const redisServerSchema = Joi.object({
host: Joi.string(),
port: Joi.number(),
password: Joi.string().allow(''),
retry: redisRetrySchema,
});
const redisSentinelSchema = Joi.object({
@ -28,7 +14,6 @@ const redisSentinelSchema = Joi.object({
})),
password: Joi.string().default('').allow(''),
sentinelPassword: Joi.string().default('').allow(''),
retry: redisRetrySchema,
});
const warp10SingleHost = Joi.object({
@ -42,23 +27,16 @@ const warp10MultiHost = Joi.object({
hosts: Joi.array().items(Joi.object({
host: Joi.alternatives(Joi.string().hostname(), Joi.string().ip()),
port: Joi.number().port(),
nodeId: Joi.string(),
})),
readToken: Joi.string(),
writeToken: Joi.string(),
});
const tlsSchema = Joi.object({
key: Joi.string(),
cert: Joi.string(),
ca: Joi.string(),
});
Joi.array().items(warp10SingleHost);
const schema = Joi.object({
host: Joi.string(),
port: Joi.number().port(),
certFilePaths: tlsSchema.default({}),
workers: Joi.number(),
development: Joi.boolean(),
log: Joi.object({
@ -77,54 +55,15 @@ const schema = Joi.object({
host: Joi.string().hostname(),
port: Joi.number().port(),
}),
reindex: Joi.object({
enabled: Joi.boolean(),
schedule: Joi.string(),
}),
bucketd: Joi.array().items(Joi.string()),
expireMetrics: Joi.boolean(),
expireMetricsTTL: Joi.number(),
cacheBackend: Joi.string().valid('memory', 'redis'),
nodeId: Joi.string(),
ingestionSchedule: Joi.string(),
ingestionShardSize: Joi.number().greater(0),
ingestionLagSeconds: Joi.number().greater(0),
checkpointSchedule: Joi.string(),
snapshotSchedule: Joi.string(),
repairSchedule: Joi.string(),
reindexSchedule: Joi.string(),
diskUsageSchedule: Joi.string(),
diskUsage: Joi.object({
path: Joi.string(),
retentionDays: Joi.number().greater(0),
expirationEnabled: Joi.boolean(),
hardLimit: Joi.string(),
}),
serviceUser: Joi.object({
arn: Joi.string(),
enabled: Joi.boolean(),
}),
filter: Joi.object(allowedFilterStates.reduce(
(filterObj, state) => {
filterObj[state] = allowedFilterFields.reduce(
(stateObj, field) => {
stateObj[field] = Joi.array().items(Joi.string());
return stateObj;
}, {},
);
return filterObj;
}, {},
)),
metrics: {
enabled: Joi.boolean(),
host: Joi.string(),
ingestPort: Joi.number().port(),
checkpointPort: Joi.number().port(),
snapshotPort: Joi.number().port(),
diskUsagePort: Joi.number().port(),
reindexPort: Joi.number().port(),
repairPort: Joi.number().port(),
},
});
module.exports = schema;

View File

@ -19,23 +19,16 @@ const constants = {
'createBucket',
'deleteBucket',
'deleteBucketCors',
'deleteBucketEncryption',
'deleteBucketLifecycle',
'deleteBucketReplication',
'deleteBucketTagging',
'deleteBucketWebsite',
'deleteObject',
'deleteObjectTagging',
'getBucketAcl',
'getBucketCors',
'getBucketEncryption',
'getBucketLifecycle',
'getBucketLocation',
'getBucketNotification',
'getBucketObjectLock',
'getBucketReplication',
'getBucketVersioning',
'getBucketTagging',
'getBucketWebsite',
'getObject',
'getObjectAcl',
@ -51,23 +44,17 @@ const constants = {
'multiObjectDelete',
'putBucketAcl',
'putBucketCors',
'putBucketEncryption',
'putBucketLifecycle',
'putBucketNotification',
'putBucketObjectLock',
'putBucketReplication',
'putBucketVersioning',
'putBucketTagging',
'putBucketWebsite',
'putData',
'putDeleteMarkerObject',
'putObject',
'putObjectAcl',
'putObjectLegalHold',
'putObjectRetention',
'putObjectTagging',
'replicateDelete',
'replicateObject',
'replicateTags',
'uploadPart',
'uploadPartCopy',
],
@ -84,7 +71,6 @@ const constants = {
sizeDelta: 'sizeD',
incomingBytes: 'inB',
outgoingBytes: 'outB',
operations: 'ops',
},
indexedEventFields: [
'acc',
@ -98,30 +84,14 @@ const constants = {
buckets: 'bck',
},
warp10EventType: ':m:utapi/event:',
warp10RecordType: ':m:utapi/record:',
warp10ValueType: ':m:utapi/event:',
truthy,
shardIngestLagSecs: 30,
checkpointLagSecs: 300,
snapshotLagSecs: 900,
repairLagSecs: 5,
counterBaseValueExpiration: 86400, // 24hrs
keyVersionSplitter: String.fromCharCode(0),
migrationChunksize: 500,
migrationOpTranslationMap: {
listBucketMultipartUploads: 'listMultipartUploads',
},
ingestionOpTranslationMap: {
putDeleteMarkerObject: 'deleteObject',
},
expirationChunkDuration: 900000000, // 15 minutes in microseconds
allowedFilterFields: [
'operationId',
'location',
'account',
'user',
'bucket',
],
allowedFilterStates: ['allow', 'deny'],
legacyApiVersion: '2016-08-15',
currentApiVersion: '2020-09-01',
};
constants.operationToResponse = constants.operations

View File

@ -1,12 +1,16 @@
{
"AccessDenied": {
"code": 403,
"description": "Access Denied"
"description": "Access denied"
},
"InternalError": {
"code": 500,
"description": "The server encountered an internal error. Please retry the request."
},
"InvalidQueryParameter" : {
"code": 400,
"description": "The query string is malformed."
},
"InvalidUri": {
"code": 400,
"description": "The requested URI does not represent any resource on the server."
@ -30,13 +34,5 @@
"InvalidRequest": {
"code": 400,
"description": "Request validation error"
},
"FailedMigration": {
"code": 1000,
"description": "failed to migrate metrics"
},
"FailedCorrection": {
"code": 1001,
"description": "failed to correct migrated metric"
}
}

View File

@ -1,16 +0,0 @@
const bucketclient = require('bucketclient');
const { BucketClientInterface } = require('arsenal').storage.metadata.bucketclient;
const config = require('../config');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const params = {
bucketdBootstrap: config.bucketd,
https: config.tls,
};
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);

View File

@ -1,141 +0,0 @@
/* eslint-disable no-restricted-syntax */
const arsenal = require('arsenal');
const async = require('async');
const metadata = require('./client');
const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants');
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = arsenal.constants;
const { BucketInfo } = arsenal.models;
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
const PAGE_SIZE = 1000;
async function _listingWrapper(bucket, params) {
return new Promise(
(resolve, reject) => metadata.listObject(
bucket,
params,
logger.newRequestLogger(),
(err, res) => {
if (err) {
reject(err);
return;
}
resolve(res);
},
),
);
}
function _listObject(bucket, prefix, hydrateFunc) {
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
let gt;
return {
async* [Symbol.asyncIterator]() {
while (true) {
let res;
try {
// eslint-disable-next-line no-await-in-loop
res = await async.retryable(ebConfig, _listingWrapper)(bucket, { ...listingParams, gt });
} catch (error) {
moduleLogger.error('Error during listing', { error });
throw error;
}
for (const item of res) {
yield hydrateFunc ? hydrateFunc(item) : item;
}
if (res.length !== PAGE_SIZE) {
break;
}
gt = res[res.length - 1].key;
}
},
};
}
function listObjects(bucket) {
return _listObject(bucket, '', data => {
const { key, value } = data;
const [name, version] = key.split(keyVersionSplitter);
return {
name,
version,
value: JSON.parse(value),
};
});
}
function listBuckets() {
return _listObject(usersBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
async function listMPUs(bucket) {
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
return _listObject(mpuBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
function bucketExists(bucket) {
return new Promise((resolve, reject) => metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(),
err => {
if (err && (!err.is || !err.is.NoSuchBucket)) {
reject(err);
return;
}
resolve(err === null);
},
));
}
function getBucket(bucket) {
return new Promise((resolve, reject) => {
metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(), (err, data) => {
if (err) {
reject(err);
return;
}
resolve(BucketInfo.fromObj(data));
},
);
});
}
module.exports = {
listBuckets,
listObjects,
listMPUs,
bucketExists,
getBucket,
};

View File

@ -3,7 +3,6 @@ const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const { apiOperations } = require('../server/spec');
const ResponseContainer = require('./ResponseContainer');
const { httpRequestDurationSeconds } = require('../server/metrics');
const apiTags = Object.keys(apiOperations);
const apiOperationIds = Object.values(apiOperations)
@ -22,7 +21,6 @@ const contextSchema = {
logger: Joi.any(),
request: Joi.any(),
results: Joi.any(),
requestTimer: Joi.any(),
};
const RequestContextModel = buildModel('RequestContext', contextSchema);
@ -36,11 +34,7 @@ class RequestContext extends RequestContextModel {
const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation;
const requestTimer = tag !== 'internal'
? httpRequestDurationSeconds.startTimer({ action: operationId })
: null;
request.logger.logger.addDefaultFields({
request.logger.addDefaultFields({
tag,
operationId,
service: 'utapi',
@ -56,7 +50,6 @@ class RequestContext extends RequestContextModel {
encrypted,
results: new ResponseContainer(),
logger: request.logger,
requestTimer,
});
}

View File

@ -1,13 +0,0 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const recordSchema = {
timestamp: Joi.number(),
objectDelta: Joi.number(),
sizeDelta: Joi.number(),
incomingBytes: Joi.number(),
outgoingBytes: Joi.number(),
operations: Joi.object(),
};
module.exports = buildModel('UtapiRecord', recordSchema);

View File

@ -1,6 +1,5 @@
const BaseModel = require('./Base');
const UtapiMetric = require('./UtapiMetric');
const UtapiRecord = require('./UtapiRecord');
const RequestContext = require('./RequestContext');
const ResponseContainer = require('./ResponseContainer');
@ -9,5 +8,4 @@ module.exports = {
UtapiMetric,
RequestContext,
ResponseContainer,
UtapiRecord,
};

View File

@ -1,211 +0,0 @@
const EventEmitter = require('events');
const { callbackify, promisify } = require('util');
const IORedis = require('ioredis');
const { jsutil } = require('arsenal');
const BackOff = require('backo');
const { whilst } = require('async');
const errors = require('./errors');
const { LoggerContext } = require('./utils/log');
const { asyncOrCallback } = require('./utils/func');
const moduleLogger = new LoggerContext({
module: 'redis',
});
const COMMAND_TIMEOUT = 10000;
const CONNECTION_TIMEOUT = 30000;
/**
* Creates a new Redis client instance
* @param {object} conf - redis configuration
* @param {string} conf.host - redis host
* @param {number} conf.port - redis port
* @param {string} [conf.password] - redis password (optional)
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
* @param {Array<Object>} conf.sentinels - sentinels
* @param {Werelogs.Logger} log - Werelogs logger
* @return {Redis} - Redis client instance
*/
class RedisClient extends EventEmitter {
constructor(options) {
super();
this._redisOptions = options;
this._redis = null;
// Controls the use of additional command timeouts
// Only use if connecting to a sentinel cluster
this._useTimeouts = options.sentinels !== undefined;
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
this._runningRedisProbe = null;
this._isConnected = false;
this._isReady = false;
}
connect(callback) {
this._initClient(false);
if (callback) {
process.nextTick(callback);
}
}
disconnect(callback) {
return asyncOrCallback(async () => {
if (this._useTimeouts) {
Object.values(this._inFlightTimeouts)
.forEach(clearTimeout);
}
if (this._redis !== null) {
await this._redis.quit();
this._redis = null;
}
}, callback);
}
get isReady() {
return this._isConnected && this._isReady;
}
_initClient(startProbe = true) {
moduleLogger.debug('initializing redis client');
if (this._redis !== null) {
this._redis.off('connect', this._onConnect);
this._redis.off('ready', this._onReady);
this._redis.off('error', this._onError);
this._redis.disconnect();
}
this._isConnected = false;
this._isReady = false;
this._redis = new IORedis(this._redisOptions);
this._redis.on('connect', this._onConnect.bind(this));
this._redis.on('ready', this._onReady.bind(this));
this._redis.on('error', this._onError.bind(this));
if (startProbe && this._runningRedisProbe === null) {
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
}
}
_probeRedis() {
if (this.isReady) {
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
clearInterval(this._runningRedisProbe);
this._runningRedisProbe = null;
} else {
moduleLogger.warn('redis client has failed to become ready, reinitializing');
this._initClient();
}
}
_onConnect() {
this._isConnected = true;
this.emit('connect');
}
_onReady() {
this._isReady = true;
this.emit('ready');
}
_onError(error) {
this._isReady = false;
moduleLogger.error('error connecting to redis', { error });
if (this.listenerCount('error') > 0) {
this.emit('error', error);
}
}
_createCommandTimeout() {
let timer;
let onTimeout;
const cancelTimeout = jsutil.once(() => {
clearTimeout(timer);
this.off('timeout', onTimeout);
this._inFlightTimeouts.delete(timer);
});
const timeout = new Promise((_, reject) => {
timer = setTimeout(this.emit.bind(this, 'timeout'), COMMAND_TIMEOUT);
this._inFlightTimeouts.add(timer);
onTimeout = () => {
moduleLogger.warn('redis command timed out');
cancelTimeout();
this._initClient();
reject(errors.OperationTimedOut);
};
this.once('timeout', onTimeout);
});
return { timeout, cancelTimeout };
}
async _call(asyncFunc) {
const start = Date.now();
const { connectBackoff } = this._redisOptions.retry || {};
const backoff = new BackOff(connectBackoff);
const timeoutMs = (connectBackoff || {}).deadline || 2000;
let retried = false;
return new Promise((resolve, reject) => {
whilst(
next => { // WARNING: test is asynchronous in `async` v3
if (!connectBackoff && !this.isReady) {
moduleLogger.warn('redis not ready and backoff is not configured');
}
process.nextTick(next, null, !!connectBackoff && !this.isReady);
},
next => {
retried = true;
if ((Date.now() - start) > timeoutMs) {
moduleLogger.error('redis still not ready after max wait, giving up', { timeoutMs });
return next(errors.InternalError.customizeDescription(
'redis client is not ready',
));
}
const backoffDurationMs = backoff.duration();
moduleLogger.error('redis not ready, retrying', { backoffDurationMs });
return setTimeout(next, backoffDurationMs);
},
err => {
if (err) {
return reject(err);
}
if (retried) {
moduleLogger.info('redis connection recovered', {
recoveryOverheadMs: Date.now() - start,
});
}
const funcPromise = asyncFunc(this._redis);
if (!this._useTimeouts) {
// If timeouts are disabled simply return the Promise
return resolve(funcPromise);
}
const { timeout, cancelTimeout } = this._createCommandTimeout();
try {
// timeout always rejects so we can just return
return resolve(Promise.race([funcPromise, timeout]));
} finally {
cancelTimeout();
}
},
);
});
}
call(func, callback) {
if (callback !== undefined) {
// If a callback is provided `func` is assumed to also take a callback
// and is converted to a promise using promisify
return callbackify(this._call.bind(this))(promisify(func), callback);
}
return this._call(func);
}
}
module.exports = RedisClient;

View File

@ -1,14 +0,0 @@
const { collectDefaultMetrics, register } = require('prom-client');
collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
async function prometheusMetrics(ctx) {
// eslint-disable-next-line no-param-reassign
ctx.results.statusCode = 200;
ctx.results.body = await register.metrics();
}
module.exports = prometheusMetrics;

View File

@ -1,63 +0,0 @@
const errors = require('../../../errors');
const { serviceToWarp10Label } = require('../../../constants');
const { clients: warp10Clients } = require('../../../warp10');
const { client: cache } = require('../../../cache');
const { now, iterIfError } = require('../../../utils');
/**
*
* @param {RequestContext} ctx - request context
* @param {object} params - request parameters
* @param {string} params.level - metric level
* @param {string} params.resource - Id of the requested resource
* @returns {Promise<undefined>} -
*/
async function getStorage(ctx, params) {
const { level, resource } = params;
if (level !== 'accounts') {
throw errors.BadRequest
.customizeDescription(`Unsupported level "${level}". Only "accounts" is currently supported`);
}
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
let storageUtilized;
if (base !== null) {
storageUtilized = counter + base;
} else {
const labelName = serviceToWarp10Label[params.level];
const labels = { [labelName]: resource };
const res = await iterIfError(warp10Clients, warp10 => {
const options = {
params: {
end: now(),
labels,
node: warp10.nodeId,
},
macro: 'utapi/getMetricsAt',
};
return warp10.exec(options);
}, error => ctx.logger.error('error while fetching metrics', { error }));
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { level, resource });
throw errors.InternalError;
}
const { sizeD: currentSize } = res.result[0];
await cache.updateAccountCounterBase(resource, currentSize);
storageUtilized = currentSize;
}
ctx.results.statusCode = 200;
ctx.results.body = {
storageUtilized: Math.max(storageUtilized, 0),
resource,
level,
};
}
module.exports = getStorage;

View File

@ -2,7 +2,6 @@ const errors = require('../../../errors');
const { UtapiMetric } = require('../../../models');
const { client: cacheClient } = require('../../../cache');
const { convertTimestamp } = require('../../../utils');
const { ingestionOpTranslationMap } = require('../../../constants');
async function ingestMetric(ctx, params) {
let metrics;
@ -10,7 +9,6 @@ async function ingestMetric(ctx, params) {
metrics = params.body.map(m => new UtapiMetric({
...m,
timestamp: convertTimestamp(m.timestamp),
operationId: ingestionOpTranslationMap[m.operationId] || m.operationId,
}));
} catch (error) {
throw errors.InvalidRequest;

View File

@ -1,7 +1,7 @@
const errors = require('../../../errors');
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
const { convertTimestamp, iterIfError } = require('../../../utils');
const { clients: warp10Clients } = require('../../../warp10');
const { convertTimestamp } = require('../../../utils');
const { client: warp10 } = require('../../../warp10');
const emptyOperationsResponse = Object.values(operationToResponse)
.reduce((prev, key) => {
@ -16,74 +16,36 @@ const metricResponseKeys = {
service: 'serviceName',
};
function positiveOrZero(value) {
return Math.max(value, 0);
}
async function listMetric(ctx, params) {
const labelName = serviceToWarp10Label[params.level];
const resources = params.body[params.level];
let [start, end] = params.body.timeRange;
if (end === undefined) {
end = Date.now();
}
let results;
try {
// A separate request will be made to warp 10 per requested resource
results = await Promise.all(
resources.map(async ({ resource, id }) => {
const labels = { [labelName]: id };
const res = await iterIfError(warp10Clients, warp10 => {
const options = {
params: {
start: convertTimestamp(start).toString(),
end: convertTimestamp(end).toString(),
labels,
node: warp10.nodeId,
},
macro: 'utapi/getMetrics',
};
return warp10.exec(options);
}, error => ctx.logger.error('error during warp 10 request', {
error,
requestParams: {
start,
end,
labels,
},
}));
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { resource, type: params.level });
throw errors.InternalError;
}
const rawMetrics = JSON.parse(res.result[0]);
// Due to various error cases it is possible for metrics in utapi to go negative.
// As this is nonsensical to the user we replace any negative values with zero.
const metrics = {
storageUtilized: rawMetrics.storageUtilized.map(positiveOrZero),
numberOfObjects: rawMetrics.numberOfObjects.map(positiveOrZero),
incomingBytes: positiveOrZero(rawMetrics.incomingBytes),
outgoingBytes: positiveOrZero(rawMetrics.outgoingBytes),
operations: rawMetrics.operations,
};
return {
resource,
metrics,
};
}),
);
} catch (error) {
ctx.logger.error('error fetching metrics from warp10', { error });
throw errors.InternalError;
}
const labelName = serviceToWarp10Label[params.resource];
const resources = params.body[params.resource];
const [start, end] = params.body.timeRange
.map(convertTimestamp)
.map(v => v.toString());
// A separate request will be made to warp 10 per requested resource
const results = await Promise.all(
resources.map(async resource => {
const labels = { [labelName]: resource };
const options = {
params: {
start,
end,
labels,
},
macro: 'utapi/getMetrics',
};
const res = await warp10.exec(options);
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { resource, type: params.resource });
throw errors.InternalError;
}
return {
resource,
metrics: JSON.parse(res.result[0]),
};
}),
);
// Convert the results from warp10 into the expected response format
const resp = results
@ -96,17 +58,23 @@ async function listMetric(ctx, params) {
const metric = {
...result.metrics,
timeRange: [start, end],
timeRange: params.body.timeRange,
operations: {
...emptyOperationsResponse,
...operations,
},
};
metric[metricResponseKeys[params.level]] = result.resource;
metric[metricResponseKeys[params.resource]] = result.resource;
return metric;
});
ctx.results.body = resp;
// Unwrap the response if only one resource is passed
if (resp.length === 1) {
[ctx.results.body] = resp;
} else {
ctx.results.body = resp;
}
ctx.results.statusCode = 200;
}

View File

@ -67,9 +67,6 @@ class APIController {
return Object.entries(apiOperationMiddleware[tag])
.reduce((handlers, [id, handler]) => {
const middleware = [];
if (handler.iplimit) {
middleware.push(utapiMiddleware.clientIpLimitMiddleware);
}
if (handler.authv4) {
middleware.push(utapiMiddleware.authV4Middleware);
}
@ -129,19 +126,19 @@ class APIController {
static async _callOperation(handler, request, response, params) {
try {
await handler(request.ctx, params);
} catch (error) {
request.logger.error('error during operation', { error });
throw error;
} catch (err) {
request.logger.error('error during operation', { err });
throw err;
}
request.logger.debug('writing operation result');
try {
await APIController._writeResult(request.ctx.results, response);
} catch (error) {
} catch (err) {
request.logger.error(
'error while writing operation result',
{ error },
{ err },
);
throw error;
throw err;
}
}

View File

@ -1,8 +1,6 @@
const http = require('http');
const https = require('https');
const express = require('express');
const bodyParser = require('body-parser');
const { ciphers, dhparam } = require('arsenal').https;
const Process = require('../process');
const config = require('../config');
@ -10,6 +8,7 @@ const { initializeOasTools, middleware } = require('./middleware');
const { spec: apiSpec } = require('./spec');
const { client: cacheClient } = require('../cache');
const { LoggerContext } = require('../utils');
const LegacyServer = require('./legacy');
const moduleLogger = new LoggerContext({
module: 'server',
@ -26,32 +25,14 @@ class UtapiServer extends Process {
const app = express();
app.use(bodyParser.json({ strict: false }));
app.use(middleware.loggerMiddleware);
app.use(middleware.apiVersionMiddleware);
await initializeOasTools(spec, app);
app.use(middleware.errorMiddleware);
app.use(middleware.httpMetricsMiddleware);
app.use(middleware.responseLoggerMiddleware);
return app;
}
static _createHttpsAgent() {
const conf = {
ciphers: ciphers.ciphers,
dhparam: dhparam.dhparam,
cert: config.tls.cert,
key: config.tls.key,
ca: config.tls.ca ? [config.tls.ca] : null,
requestCert: false,
rejectUnauthorized: true,
};
const agent = new https.Agent(conf);
conf.agent = agent;
return conf;
}
static async _createServer(app) {
if (config.tls) {
return https.createServer(UtapiServer._createHttpsAgent(), app);
}
return http.createServer(app);
}
@ -68,6 +49,7 @@ class UtapiServer extends Process {
async _setup() {
this._app = await UtapiServer._createApp(apiSpec);
this._server = await UtapiServer._createServer(this._app);
LegacyServer.setup();
}
async _start() {

114
libV2/server/legacy.js Normal file
View File

@ -0,0 +1,114 @@
const url = require('url');
const config = require('../config');
const errors = require('../errors');
const routes = require('../../router/routes');
const Route = require('../../router/Route');
const Router = require('../../router/Router');
const redisClient = require('../../utils/redisClient');
const UtapiRequest = require('../../lib/UtapiRequest');
const Datastore = require('../../lib/Datastore');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'server.legacy',
});
/**
* Function to validate a URI component
*
* @param {string|object} component - path from url.parse of request.url
* (pathname plus query) or query from request
* @return {string|undefined} If `decodeURIComponent` throws an error,
* return the invalid `decodeURIComponent` string, otherwise return
* `undefined`
*/
function _checkURIComponent(component) {
if (typeof component === 'string') {
try {
decodeURIComponent(component);
} catch (err) {
return true;
}
} else {
return Object.keys(component).find(x => {
try {
decodeURIComponent(x);
decodeURIComponent(component[x]);
} catch (err) {
return true;
}
return false;
});
}
return undefined;
}
class LegacyServer {
constructor() {
this.router = null;
this.datastore = null;
}
setup() {
this.router = new Router(config);
routes.forEach(item => this.router.addRoute(new Route(item)));
const logger = moduleLogger.with({ component: 'redis' });
this.datastore = new Datastore().setClient(redisClient(config.redis, logger));
}
handleRequest(req, res, next) {
const { query, path, pathname } = url.parse(req.url, true);
// Sanity check for valid URI component
if (_checkURIComponent(query) || _checkURIComponent(path)) {
return next(errors.InvalidURI);
}
const utapiRequest = new UtapiRequest()
.setRequest(req)
.setLog(req.logger)
.setResponse(res)
.setDatastore(this.datastore)
.setRequestQuery(query)
.setRequestPath(path)
.setRequestPathname(pathname);
return this.router.doRoute(utapiRequest, (err, data) => {
if (err) {
// eslint-disable-next-line no-param-reassign
err.utapiError = true; // Make sure this error is returned as-is
next(err);
return;
}
const log = utapiRequest.getLog();
const res = utapiRequest.getResponse();
req.logger.trace('writing HTTP response', {
method: 'UtapiServer.response',
});
const code = utapiRequest.getStatusCode();
/*
* Encoding data to binary provides a hot path to write data
* directly to the socket, without node.js trying to encode the data
* over and over again.
*/
const payload = Buffer.from(JSON.stringify(data), 'utf8');
res.writeHead(code, {
'server': 'ScalityS3',
'x-scal-request-id': log.getSerializedUids(),
'content-type': 'application/json',
'content-length': payload.length,
});
res.write(payload);
res.end();
next();
});
}
}
module.exports = new LegacyServer();

View File

@ -1,20 +0,0 @@
const promClient = require('prom-client');
const httpRequestsTotal = new promClient.Counter({
name: 's3_utapi_http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['action', 'code'],
});
const httpRequestDurationSeconds = new promClient.Histogram({
name: 's3_utapi_http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['action', 'code'],
// buckets for response time from 0.1ms to 60s
buckets: [0.0001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0],
});
module.exports = {
httpRequestDurationSeconds,
httpRequestsTotal,
};

View File

@ -1,18 +1,24 @@
const oasTools = require('oas-tools');
const path = require('path');
const { promisify } = require('util');
const { ipCheck } = require('arsenal');
const Joi = require('@hapi/joi');
const oasTools = require('oas-tools');
const werelogs = require('werelogs');
const config = require('../config');
const { logger, buildRequestLogger } = require('../utils');
const { legacyApiVersion, currentApiVersion } = require('../constants');
const errors = require('../errors');
const { translateAndAuthorize } = require('../vault');
const metricHandlers = require('./metrics');
const { buildRequestLogger } = require('../utils');
const { authenticateRequest } = require('../vault');
const LegacyServer = require('./legacy');
const oasLogger = new werelogs.Werelogs({
level: config.logging.level === 'trace' ? 'debug' : 'info', // oasTools is very verbose
dump: config.logging.dumpLevel,
});
const oasOptions = {
controllers: path.join(__dirname, './API/'),
checkControllers: true,
loglevel: config.logging.level === 'trace' ? 'debug' : 'info', // oasTools is very verbose
customLogger: logger,
customLogger: new oasLogger.Logger('Utapi'),
customErrorHandling: true,
strict: true,
router: true,
@ -33,11 +39,6 @@ if (config.development) {
};
}
async function initializeOasTools(spec, app) {
oasTools.configure(oasOptions);
return promisify(oasTools.initialize)(spec, app);
}
function loggerMiddleware(req, res, next) {
// eslint-disable-next-line no-param-reassign
req.logger = buildRequestLogger(req);
@ -45,29 +46,13 @@ function loggerMiddleware(req, res, next) {
return next();
}
function responseLoggerMiddleware(req, res, next) {
const info = {
httpCode: res.statusCode,
httpMessage: res.statusMessage,
};
req.logger.end('finished handling request', info);
if (next !== undefined) {
next();
}
}
function httpMetricsMiddleware(request, response, next) {
// If the request.ctx is undefined then this is an internal oasTools request (/_/docs)
// No metrics should be pushed
if (config.metrics.enabled && request.ctx && request.ctx.tag !== 'internal') {
metricHandlers.httpRequestsTotal
.labels({
action: request.ctx.operationId,
code: response.statusCode,
}).inc(1);
request.ctx.requestTimer({ code: response.statusCode });
}
if (next) {
next();
}
@ -76,70 +61,95 @@ function httpMetricsMiddleware(request, response, next) {
// next is purposely not called as all error responses are handled here
// eslint-disable-next-line no-unused-vars
function errorMiddleware(err, req, res, next) {
let statusCode = err.code || 500;
let code = err.message || 'InternalError';
let message = err.description || 'Internal Error';
let code = err.code || 500;
let message = err.message || 'Internal Error';
// failed request validation by oas-tools
if (err.failedValidation) {
// You can't actually use destructing here
/* eslint-disable prefer-destructuring */
statusCode = errors.InvalidRequest.code;
code = errors.InvalidRequest.message;
message = errors.InvalidRequest.description;
code = errors.InvalidRequest.code;
message = errors.InvalidRequest.message;
/* eslint-enable prefer-destructuring */
}
if (!err.utapiError && !config.development) {
// Make sure internal errors don't leak when not in development
code = 'InternalError';
message = 'Internal Error';
}
res.status(statusCode).send({
code,
message,
res.status(code).send({
error: {
code: code.toString(),
message,
},
});
responseLoggerMiddleware(req, res, () => httpMetricsMiddleware(req, res));
responseLoggerMiddleware(req, {
statusCode: code,
statusMessage: message,
});
}
const _versionFormat = Joi.string().pattern(/^\d{4}-\d{2}-\d{2}$/);
function apiVersionMiddleware(request, response, next) {
const apiVersion = request.query.Version;
if (!apiVersion) {
request.logger.debug('no api version specified, assuming latest');
next();
return;
}
try {
Joi.assert(apiVersion, _versionFormat);
} catch (err) {
request.logger.error('malformed Version parameter', { apiVersion });
next(errors.InvalidQueryParameter
.customizeDescription('The Version query parameter is malformed.'));
return;
}
if (apiVersion === legacyApiVersion) {
request.logger.debug('legacy api version specified routing to v1');
LegacyServer.handleRequest(request, response, err => {
if (err) {
return next(err);
}
responseLoggerMiddleware(request, response);
// next is purposefully not called as LegacyServer handles its own response
});
return;
}
if (apiVersion === currentApiVersion) {
request.logger.debug('latest api version specified routing to v2');
next();
return;
}
next(errors.InvalidQueryParameter
.customizeDescription('Invalid value for Version'));
}
async function initializeOasTools(spec, app) {
oasTools.configure(oasOptions);
return promisify(oasTools.initialize)(spec, app);
}
// eslint-disable-next-line no-unused-vars
async function authV4Middleware(request, response, params) {
const authHeader = request.headers.authorization;
if (!authHeader || !authHeader.startsWith('AWS4-')) {
request.logger.error('missing auth header for v4 auth');
request.log.error('missing auth header for v4 auth');
throw errors.InvalidRequest.customizeDescription('Must use Auth V4 for this request.');
}
let action = 'ListMetrics';
let requestedResources = [];
switch (request.ctx.operationId) {
case 'listMetrics':
requestedResources = params.body[params.level];
action = params.Action;
break;
default:
requestedResources = [params.resource];
break;
}
if (requestedResources.length === 0) {
throw errors.InvalidRequest.customizeDescription('You must specify at least one resource');
}
let passed;
let authorizedResources;
try {
[passed, authorizedResources] = await translateAndAuthorize(request, action, params.level, requestedResources);
[passed, authorizedResources] = await authenticateRequest(request, params);
} catch (error) {
request.logger.error('error during authentication', { error });
// rethrow any access denied errors
if ((error.is && error.is.AccessDenied) || (error.utapiError && error.AccessDenied)) {
throw error;
}
throw errors.InternalError;
}
@ -148,23 +158,8 @@ async function authV4Middleware(request, response, params) {
throw errors.AccessDenied;
}
switch (request.ctx.operationId) {
case 'listMetrics':
params.body[params.level] = authorizedResources;
break;
default:
[params.resource] = authorizedResources;
break;
}
}
async function clientIpLimitMiddleware(request) {
const allowIp = ipCheck.ipMatchCidrList(
config.healthChecks.allowFrom, request.ip,
);
if (!allowIp) {
throw errors.AccessDenied.customizeDescription('unauthorized origin ip on request');
if (authorizedResources !== undefined) {
params.body[params.resource.value] = authorizedResources;
}
}
@ -175,7 +170,6 @@ module.exports = {
errorMiddleware,
responseLoggerMiddleware,
authV4Middleware,
clientIpLimitMiddleware,
httpMetricsMiddleware,
apiVersionMiddleware,
},
};

View File

@ -55,10 +55,6 @@ function _getApiOperationMiddleware(routes) {
middleware.authv4 = true;
}
if (op['x-iplimit'] === true) {
middleware.iplimit = true;
}
optIds[tag][op.operationId] = middleware;
moduleLogger

View File

@ -1,12 +1,11 @@
const assert = require('assert');
const cron = require('node-schedule');
const cron = require('node-cron');
const cronparser = require('cron-parser');
const promClient = require('prom-client');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const config = require('../config');
const { client: cacheClient } = require('../cache');
const Process = require('../process');
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
const { LoggerContext } = require('../utils');
const { Warp10Client } = require('../warp10');
const logger = new LoggerContext({
module: 'BaseTask',
@ -17,104 +16,30 @@ class Now {}
class BaseTask extends Process {
constructor(options) {
super();
assert.notStrictEqual(options, undefined);
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
this._cache = cacheClient;
this._warp10Clients = options.warp10;
this._warp10 = new Warp10Client({
...config.warp10,
...((options && options.warp10) || {}),
});
this._scheduler = null;
this._defaultSchedule = Now;
this._defaultLag = 0;
this._enableMetrics = options.enableMetrics || false;
this._metricsHost = options.metricsHost || 'localhost';
this._metricsPort = options.metricsPort || 9001;
this._metricsHandlers = null;
this._probeServer = null;
this._nodeId = config.nodeId;
}
async _setup(includeDefaultOpts = true) {
if (includeDefaultOpts) {
this._program
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
.option(
'-s, --schedule <crontab>',
'Execute task using this crontab. Overrides configured schedule',
value => {
cronparser.parseExpression(value);
return value;
},
)
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
.option('-n, --node-id <id>', 'Set a custom node id');
}
if (this._enableMetrics) {
promClient.collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
this._metricsHandlers = {
...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(),
};
await this._createProbeServer();
}
}
_registerDefaultMetricHandlers() {
const taskName = this.constructor.name;
// Get the name of our subclass in snake case format eg BaseClass => _base_class
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
const executionDuration = new promClient.Gauge({
name: `s3_utapi${taskNameSnake}_duration_seconds`,
help: `Execution time of the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionAttempts = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_attempts_total`,
help: `Total number of attempts to execute the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionFailures = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_failures_total`,
help: `Total number of failures executing the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
return {
executionDuration,
executionAttempts,
executionFailures,
};
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
return {};
}
async _createProbeServer() {
this._probeServer = await startProbeServer({
bindAddress: this._metricsHost,
port: this._metricsPort,
});
this._probeServer.addHandler(
DEFAULT_METRICS_ROUTE,
(res, log) => {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': promClient.register.contentType,
});
promClient.register.metrics().then(metrics => {
res.end(metrics);
});
},
);
async _setup() {
this._program
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
.option(
'-s, --schedule <crontab>',
'Execute task using this crontab. Overrides configured schedule',
value => {
cronparser.parseExpression(value);
return value;
},
)
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
.option('-n, --node-id <id>', 'Set a custom node id');
}
get schedule() {
@ -134,6 +59,13 @@ class BaseTask extends Process {
return this._defaultLag;
}
get nodeId() {
if (this._program.nodeId) {
return this._program.nodeId;
}
return this._nodeId;
}
async _start() {
await this._cache.connect();
if (this.schedule === Now) {
@ -142,36 +74,26 @@ class BaseTask extends Process {
this.join();
});
} else {
this._scheduler = cron.scheduleJob(this.schedule,
this._scheduler = cron.schedule(this.schedule,
async () => {
this._scheduler.cancel(); // Halt execution to avoid overlapping tasks
this._scheduler.stop(); // Halt execution to avoid overlapping tasks
await this.execute();
this._scheduler.reschedule(this.schedule);
this._scheduler.start();
});
this.on('exit', () => {
this._scheduler.cancel();
this._scheduler.stop();
this._scheduler.destroy();
});
}
}
async execute() {
let endTimer;
if (this._enableMetrics) {
endTimer = this._metricsHandlers.executionDuration.startTimer();
this._metricsHandlers.executionAttempts.inc(1);
}
try {
const timestamp = new Date() * 1000; // Timestamp in microseconds;
const laggedTimestamp = timestamp - (this.lag * 1000000);
await this._execute(laggedTimestamp);
} catch (error) {
logger.error('Error during task execution', { error });
this._metricsHandlers.executionFailures.inc(1);
}
if (this._enableMetrics) {
endTimer();
}
}
@ -181,28 +103,8 @@ class BaseTask extends Process {
}
async _join() {
if (this._probeServer !== null) {
this._probeServer.stop();
}
return this._cache.disconnect();
}
withWarp10(func, onError) {
return iterIfError(this._warp10Clients, func, error => {
if (onError) {
onError(error);
} else {
const {
name, code, message, stack,
} = error;
logger.error('error during warp 10 request', {
error: {
name, code, errmsg: message, stack: name !== 'RequestError' ? stack : undefined,
},
});
}
});
}
}
module.exports = BaseTask;

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants');
@ -11,103 +10,30 @@ const logger = new LoggerContext({
class CreateCheckpoint extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.checkpointPort,
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.checkpointSchedule;
this._defaultLag = checkpointLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_checkpoint_created_total',
help: 'Total number of checkpoints created',
labelNames: ['origin', 'containerName'],
});
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
const lastCheckpoint = new promClient.Gauge({
name: 's3_utapi_create_checkpoint_last_checkpoint_seconds',
help: 'Timestamp of the last successfully created checkpoint',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastCheckpoint();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastCheckpoint,
};
}
/**
* Metrics for CreateCheckpoint
* @typedef {Object} CreateCheckpointMetrics
* @property {number} created - Number of checkpoints created
*/
/**
*
* @param {CreateCheckpointMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _getLastCheckpoint() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.checkpoint.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (!resp.result || (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/createCheckpoint',
};
return warp10.exec(params);
});
const params = {
params: {
nodeId: this.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/createCheckpoint',
};
const status = await this._warp10.exec(params);
if (status.result[0]) {
logger.info(`created ${status.result[0] || 0} checkpoints`);
this._pushMetrics({ created: status.result[0] });
}
}
}

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { snapshotLagSecs } = require('../constants');
@ -11,103 +10,29 @@ const logger = new LoggerContext({
class CreateSnapshot extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.snapshotPort,
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.snapshotSchedule;
this._defaultLag = snapshotLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_snapshot_created_total',
help: 'Total number of snapshots created',
labelNames: ['origin', 'containerName'],
});
const getLastSnapshot = this._getLastSnapshot.bind(this);
const lastSnapshot = new promClient.Gauge({
name: 's3_utapi_create_snapshot_last_snapshot_seconds',
help: 'Timestamp of the last successfully created snapshot',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastSnapshot();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastSnapshot,
};
}
/**
* Metrics for CreateSnapshot
* @typedef {Object} CreateSnapshotMetrics
* @property {number} created - Number of snapshots created
*/
/**
*
* @param {CreateSnapshotMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _getLastSnapshot() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (!resp.result || (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) {
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
},
macro: 'utapi/createSnapshot',
};
return warp10.exec(params);
});
const params = {
params: {
nodeId: this.nodeId,
end: timestamp.toString(),
},
macro: 'utapi/createSnapshot',
};
const status = await this._warp10.exec(params);
if (status.result[0]) {
logger.info(`created ${status.result[0]} snapshots`);
this._pushMetrics({ created: status.result[0] });
}
}
}

View File

@ -1,300 +0,0 @@
const async = require('async');
const Path = require('path');
const fs = require('fs');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { expirationChunkDuration } = require('../constants');
const {
LoggerContext, getFolderSize, formatDiskSize, sliceTimeRange,
} = require('../utils');
const moduleLogger = new LoggerContext({
module: 'MonitorDiskUsage',
path: config.diskUsage.path,
});
const WARN_THRESHOLD = 0.8;
const ACTION_THRESHOLD = 0.95;
class MonitorDiskUsage extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.diskUsagePort,
...options,
});
this._defaultSchedule = config.diskUsageSchedule;
this._defaultLag = 0;
this._path = config.diskUsage.path;
this._enabled = config.diskUsage.enabled;
this._expirationEnabled = config.diskUsage.expirationEnabled;
this._metricRetentionMicroSecs = config.diskUsage.retentionDays * 24 * 60 * 60 * 1000000;
this._hardLimit = config.diskUsage.hardLimit || null;
}
async _setup() {
await super._setup();
this._program
.option('--leader', 'Mark this process as the leader for metric expiration.')
.option(
'--lock',
'Manually trigger a lock of the warp 10 database. This will cause all other options to be ignored.',
)
.option(
'--unlock',
'Manually trigger an unlock of the warp 10 database. This will cause all other options to be ignored.',
);
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const isLocked = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_is_locked',
help: 'Indicates whether the monitored warp 10 has had writes disabled',
labelNames: ['origin', 'containerName'],
});
const leveldbBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_leveldb_bytes',
help: 'Total bytes used by warp 10 leveldb',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10 datalog',
labelNames: ['origin', 'containerName'],
});
const hardLimitRatio = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_ratio',
help: 'Percent of the hard limit used by warp 10',
labelNames: ['origin', 'containerName'],
});
const hardLimitSetting = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_bytes',
help: 'The hard limit setting in bytes',
labelNames: ['origin', 'containerName'],
});
return {
isLocked,
leveldbBytes,
datalogBytes,
hardLimitRatio,
hardLimitSetting,
};
}
/**
* Metrics for MonitorDiskUsage
* @typedef {Object} MonitorDiskUsageMetrics
* @property {boolean} isLocked - Indicates if writes have been disabled for the monitored warp10
* @property {number} leveldbBytes - Total bytes used by warp 10 leveldb
* @property {number} datalogBytes - Total bytes used by warp 10 datalog
* @property {number} hardLimitRatio - Percent of the hard limit used by warp 10
* @property {number} hardLimitSetting - The hard limit setting in bytes
*/
/**
*
* @param {MonitorDiskUsageMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.isLocked !== undefined) {
this._metricsHandlers.isLocked.set(metrics.isLocked ? 1 : 0);
}
if (metrics.leveldbBytes !== undefined) {
this._metricsHandlers.leveldbBytes.set(metrics.leveldbBytes);
}
if (metrics.datalogBytes !== undefined) {
this._metricsHandlers.datalogBytes.set(metrics.datalogBytes);
}
if (metrics.hardLimitRatio !== undefined) {
this._metricsHandlers.hardLimitRatio.set(metrics.hardLimitRatio);
}
if (metrics.hardLimitSetting !== undefined) {
this._metricsHandlers.hardLimitSetting.set(metrics.hardLimitSetting);
}
}
get isLeader() {
return this._program.leader !== undefined;
}
get isManualUnlock() {
return this._program.unlock !== undefined;
}
get isManualLock() {
return this._program.lock !== undefined;
}
// eslint-disable-next-line class-methods-use-this
async _getUsage(path) {
moduleLogger.debug(`calculating disk usage for ${path}`);
if (!fs.existsSync(path)) {
throw Error(`failed to calculate usage for non-existent path ${path}`);
}
return getFolderSize(path);
}
async _expireMetrics(timestamp) {
const resp = await this.withWarp10(async warp10 =>
warp10.exec({
macro: 'utapi/findOldestRecord',
params: {
class: '~.*',
labels: {},
},
}));
if (!resp.result || resp.result.length !== 1) {
moduleLogger.error('failed to fetch oldest record timestamp. expiration failed');
return;
}
const oldestTimestamp = resp.result[0];
if (oldestTimestamp === -1) {
moduleLogger.info('No records found, nothing to delete.');
return;
}
const endTimestamp = timestamp - this._metricRetentionMicroSecs;
if (oldestTimestamp > endTimestamp) {
moduleLogger.info('No records exceed retention period, nothing to delete.');
return;
}
await async.eachSeries(
sliceTimeRange(oldestTimestamp - 1, endTimestamp, expirationChunkDuration),
async ([start, end]) => {
moduleLogger.info('deleting metrics',
{ start, end });
return this.withWarp10(async warp10 =>
warp10.delete({
className: '~.*',
start,
end,
}));
},
);
}
_checkHardLimit(size, nodeId) {
const hardPercentage = parseFloat((size / this._hardLimit).toFixed(2));
const hardLimitHuman = formatDiskSize(this._hardLimit);
const hardLogger = moduleLogger.with({
size,
sizeHuman: formatDiskSize(size),
hardPercentage,
hardLimit: this._hardLimit,
hardLimitHuman,
nodeId,
});
this._pushMetrics({ hardLimitRatio: hardPercentage });
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
if (hardPercentage < WARN_THRESHOLD) {
hardLogger.debug(msg);
} else if (hardPercentage >= WARN_THRESHOLD && hardPercentage < ACTION_THRESHOLD) {
hardLogger.warn(msg);
} else {
hardLogger.error(msg);
return true;
}
return false;
}
async _disableWarp10Updates() {
return this.withWarp10(async warp10 =>
warp10.exec({
script: `
DROP DROP
'Hard limit has been reached. Further updates have been disabled.'
'scality'
UPDATEOFF`,
params: {},
}));
}
async _enableWarp10Updates() {
return this.withWarp10(async warp10 =>
warp10.exec({
script: "DROP DROP 'scality' UPDATEON",
params: {},
}));
}
async _execute(timestamp) {
if (this.isManualUnlock) {
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
await this._enableWarp10Updates();
this._pushMetrics({ isLocked: false });
return;
}
if (this.isManualLock) {
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
await this._disableWarp10Updates();
this._pushMetrics({ isLocked: true });
return;
}
if (this._expirationEnabled && this.isLeader) {
moduleLogger.info(`expiring metrics older than ${config.diskUsage.retentionDays} days`);
await this._expireMetrics(timestamp);
return;
}
if (!this._enabled) {
moduleLogger.debug('disk usage monitoring not enabled, skipping check');
return;
}
let leveldbBytes = null;
let datalogBytes = null;
try {
leveldbBytes = await this._getUsage(Path.join(this._path, 'leveldb'));
datalogBytes = await this._getUsage(Path.join(this._path, 'datalog'));
} catch (error) {
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
return;
}
this._pushMetrics({ leveldbBytes, datalogBytes });
const size = leveldbBytes + datalogBytes;
if (this._hardLimit !== null) {
moduleLogger.info(`warp 10 using ${formatDiskSize(size)} of disk space`, { leveldbBytes, datalogBytes });
const shouldLock = this._checkHardLimit(size, this.nodeId);
if (shouldLock) {
moduleLogger.warn('hard limit exceeded, disabling writes to warp 10', { nodeId: this.nodeId });
await this._disableWarp10Updates();
} else {
moduleLogger.info('usage below hard limit, ensuring writes to warp 10 are enabled',
{ nodeId: this.nodeId });
await this._enableWarp10Updates();
}
this._pushMetrics({ isLocked: shouldLock, hardLimitSetting: this._hardLimit });
}
}
}
module.exports = MonitorDiskUsage;

View File

@ -1,113 +1,24 @@
const assert = require('assert');
const async = require('async');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const config = require('../config');
const { checkpointLagSecs } = require('../constants');
const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock,
} = require('../utils');
const { shardIngestLagSecs, checkpointLagSecs } = require('../constants');
const logger = new LoggerContext({
module: 'IngestShard',
});
const now = () => convertTimestamp(new Date().getTime());
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
class IngestShardTask extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.ingestPort,
...options,
});
constructor(...options) {
super(...options);
this._defaultSchedule = config.ingestionSchedule;
this._defaultLag = config.ingestionLagSeconds;
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const ingestedTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_ingest_total',
help: 'Total number of metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedSlow = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_slow_total',
help: 'Total number of slow metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedShards = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_ingest_total',
help: 'Total number of metric shards ingested',
labelNames: ['origin', 'containerName'],
});
const shardAgeTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_age_total',
help: 'Total aggregated age of shards',
labelNames: ['origin', 'containerName'],
});
return {
ingestedTotal,
ingestedSlow,
ingestedShards,
shardAgeTotal,
};
}
/**
* Metrics for IngestShardTask
* @typedef {Object} IngestShardMetrics
* @property {number} ingestedTotal - Number of events ingested
* @property {number} ingestedSlow - Number of slow events ingested
* @property {number} ingestedShards - Number of metric shards ingested
* @property {number} shardAgeTotal - Aggregated age of shards
*/
/**
*
* @param {IngestShardMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.ingestedTotal !== undefined) {
this._metricsHandlers.ingestedTotal.inc(metrics.ingestedTotal);
}
if (metrics.ingestedSlow !== undefined) {
this._metricsHandlers.ingestedSlow.inc(metrics.ingestedSlow);
}
if (metrics.ingestedShards !== undefined) {
this._metricsHandlers.ingestedShards.inc(metrics.ingestedShards);
}
if (metrics.shardAgeTotal !== undefined) {
this._metricsHandlers.shardAgeTotal.inc(metrics.shardAgeTotal);
}
}
_hydrateEvent(data, stripTimestamp = false) {
const event = JSON.parse(data);
if (this._stripEventUUID) {
delete event.uuid;
}
if (stripTimestamp) {
delete event.timestamp;
}
return new UtapiMetric(event);
this._defaultLag = shardIngestLagSecs;
}
async _execute(timestamp) {
@ -124,61 +35,41 @@ class IngestShardTask extends BaseTask {
return;
}
let shardAgeTotal = 0;
let ingestedShards = 0;
await async.eachLimit(toIngest, 10,
await Promise.all(toIngest.map(
async shard => {
if (await this._cache.shardExists(shard)) {
const metrics = await this._cache.getMetricsForShard(shard);
if (metrics.length > 0) {
logger.info(`Ingesting ${metrics.length} events from shard`, { shard });
const shardAge = now() - shard;
const areSlowEvents = shardAge >= checkpointLagMicroseconds;
const metricClass = areSlowEvents ? 'utapi.repair.event' : 'utapi.event';
if (areSlowEvents) {
let metricClass;
let records;
if (shardAge < checkpointLagMicroseconds) {
metricClass = 'utapi.event';
records = metrics
.map(m => new UtapiMetric(JSON.parse(m)));
} else {
logger.info('Detected slow records, ingesting as repair');
metricClass = 'utapi.repair.event';
const clock = new InterpolatedClock();
records = metrics
.map(data => {
const metric = JSON.parse(data);
metric.timestamp = clock.getTs();
return new UtapiMetric(metric);
});
}
const records = metrics.map(m => this._hydrateEvent(m, areSlowEvents));
records.sort((a, b) => a.timestamp - b.timestamp);
const clock = new InterpolatedClock();
records.forEach(r => {
r.timestamp = clock.getTs(r.timestamp);
});
let ingestedIntoNodeId;
const status = await this.withWarp10(async warp10 => {
// eslint-disable-next-line prefer-destructuring
ingestedIntoNodeId = warp10.nodeId;
return warp10.ingest(
{
className: metricClass,
labels: { origin: config.nodeId },
}, records,
);
});
const status = await this._warp10.ingest(metricClass, records);
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
shardAgeTotal += shardAge;
ingestedShards += 1;
this._pushMetrics({ ingestedTotal: records.length });
if (areSlowEvents) {
this._pushMetrics({ ingestedSlow: records.length });
}
} else {
logger.debug('No events found in shard, cleaning up');
}
} else {
logger.warn('shard does not exist', { shard });
}
});
const shardAgeTotalSecs = shardAgeTotal / 1000000;
this._pushMetrics({ shardAgeTotal: shardAgeTotalSecs, ingestedShards });
},
));
}
}

View File

@ -1,85 +0,0 @@
const async = require('async');
const BaseTask = require('./BaseTask');
const UtapiClient = require('../client');
const { LoggerContext } = require('../utils');
const logger = new LoggerContext({
module: 'ManualAdjust',
});
function collectArgs(arg, prev) {
return prev.concat([arg]);
}
class ManualAdjust extends BaseTask {
async _setup() {
// Don't include default flags
await super._setup(false);
this._program
.option('-h, --host <host>', 'Utapi server host', 'localhost')
.option('-p, --port <port>', 'Utapi server port', '8100', parseInt)
.option('-b, --bucket <buckets...>', 'target these buckets', collectArgs, [])
.option('-a, --account <accounts...>', 'target these accounts', collectArgs, [])
.option('-u, --user <users...>', 'target these users', collectArgs, [])
.requiredOption('-o, --objects <adjustment>', 'adjust numberOfObjects by this amount', parseInt)
.requiredOption('-s, --storage <adjustment>', 'adjust storageUtilized by this amount', parseInt);
}
async _start() {
this._utapiClient = new UtapiClient({
host: this._program.host,
port: this._program.port,
disableRetryCache: true,
});
await super._start();
}
async _pushAdjustmentMetric(metric) {
logger.info('pushing adjustment metric', { metric });
await this._utapiClient.pushMetric(metric);
}
async _execute() {
const timestamp = Date.now();
const objectDelta = this._program.objects;
const sizeDelta = this._program.storage;
if (!this._program.bucket.length && !this._program.account.length && !this._program.user.length) {
throw Error('You must provided at least one of --bucket, --account or --user');
}
logger.info('writing adjustments');
if (this._program.bucket.length) {
logger.info('adjusting buckets');
await async.eachSeries(
this._program.bucket,
async bucket => this._pushAdjustmentMetric({
bucket, objectDelta, sizeDelta, timestamp,
}),
);
}
if (this._program.account.length) {
logger.info('adjusting accounts');
await async.eachSeries(
this._program.account,
async account => this._pushAdjustmentMetric({
account, objectDelta, sizeDelta, timestamp,
}),
);
}
if (this._program.user.length) {
logger.info('adjusting users');
await async.eachSeries(
this._program.user,
async user => this._pushAdjustmentMetric({
user, objectDelta, sizeDelta, timestamp,
}),
);
}
}
}
module.exports = ManualAdjust;

View File

@ -1,426 +0,0 @@
/* eslint-disable no-await-in-loop, no-restricted-syntax, no-loop-func */
const async = require('async');
const { jsutil } = require('arsenal');
const BaseTask = require('./BaseTask');
const { UtapiRecord } = require('../models');
const config = require('../config');
const errors = require('../errors');
const RedisClient = require('../redis');
const {
warp10RecordType,
operations: operationIds,
serviceToWarp10Label,
migrationOpTranslationMap,
} = require('../constants');
const {
LoggerContext,
now,
convertTimestamp,
comprehend,
} = require('../utils');
const REDIS_CHUNKSIZE = 50;
const WARP10_SCAN_SIZE = 100;
const logger = new LoggerContext({
module: 'MigrateTask',
});
function lowerCaseFirst(string) {
return string.charAt(0).toLowerCase() + string.slice(1);
}
const LEVELS_TO_MIGRATE = [
'buckets',
'users',
'accounts',
// TODO support service level metrics
// 'service',
];
class MigrateTask extends BaseTask {
constructor(options) {
super(options);
this._failedCorrections = [];
this._redis = new RedisClient(config.redis);
}
static _parseMetricValue(value) {
if (value.includes(':')) {
return parseInt(value.split(':')[0], 10);
}
return parseInt(value, 10);
}
static async* _iterStream(stream) {
let finished = false;
let data;
stream.on('end', () => { finished = true; });
stream.pause();
while (!finished) {
data = await new Promise(resolve => {
const _resolve = jsutil.once(resolve);
const end = () => _resolve([]);
stream.once('end', end);
stream.once('data', _data => {
stream.pause();
stream.off('end', end);
_resolve(_data);
});
stream.resume();
});
for (const item of data) {
yield item;
}
}
}
async* _iterResources(level) {
const redis = this._redis._redis;
const keys = MigrateTask._iterStream(redis.scanStream({
count: 100,
match: `s3:${level}:*:storageUtilized`,
}));
for await (const key of keys) {
yield key.split(':')[2];
}
}
async* _iterIntervalOperations(level, resource, timestamp) {
const redis = this._redis._redis;
const keys = MigrateTask._iterStream(
redis.scanStream({
count: 100,
match: `s3:${level}:${timestamp}:${resource}:*`,
}),
);
for await (const key of keys) {
const count = MigrateTask._parseMetricValue(await redis.get(key));
const op = lowerCaseFirst(key.split(':')[4]);
yield { op, count };
}
}
async* _iterSortedSet(key) {
let start = 0;
while (true) {
// zrange is inclusive
const end = start + REDIS_CHUNKSIZE - 1;
const results = await this._redis.call(async redis => redis.zrange(key, start, end, 'WITHSCORES'));
for (let x = 0; x < results.length - 1; x += 2) {
yield {
value: MigrateTask._parseMetricValue(results[x]),
score: MigrateTask._parseMetricValue(results[x + 1]),
};
}
if (results.length < REDIS_CHUNKSIZE) {
break;
}
start += REDIS_CHUNKSIZE;
}
}
async* _iterMetrics(level, resource) {
let storageUtilizedOffset = 0;
let numberOfObjectsOffset = 0;
for await (const entry of this._iterSortedSet(`s3:${level}:${resource}:storageUtilized`)) {
const { score: timestamp, value: storageUtilized } = entry;
const numberOfObjectsResp = await this._redis.call(redis => redis.zrangebyscore(
`s3:${level}:${resource}:numberOfObjects`,
timestamp,
timestamp,
));
let numberOfObjects;
if (numberOfObjectsResp.length === 1) {
numberOfObjects = MigrateTask._parseMetricValue(numberOfObjectsResp[0]);
} else {
numberOfObjects = numberOfObjectsOffset;
logger.warn('Could not retrieve value for numberOfObjects, falling back to last seen value',
{
metricLevel: level,
resource,
metricTimestamp: timestamp,
lastSeen: numberOfObjectsOffset,
});
}
let incomingBytes = 0;
let outgoingBytes = 0;
const operations = {};
for await (const apiOp of this._iterIntervalOperations(level, resource, timestamp)) {
if (apiOp.op === 'incomingBytes') {
incomingBytes = apiOp.count;
} else if (apiOp.op === 'outgoingBytes') {
outgoingBytes = apiOp.count;
} else if (operationIds.includes(apiOp.op)) {
operations[apiOp.op] = apiOp.count;
} else if (migrationOpTranslationMap[apiOp.op] !== undefined) {
operations[migrationOpTranslationMap[apiOp.op]] = apiOp.count;
} else {
logger.warn('dropping unknown operation', { apiOp });
}
}
yield {
timestamp: convertTimestamp(timestamp),
sizeDelta: storageUtilized - storageUtilizedOffset,
objectDelta: numberOfObjects - numberOfObjectsOffset,
incomingBytes,
outgoingBytes,
operations,
};
storageUtilizedOffset = storageUtilized;
numberOfObjectsOffset = numberOfObjects;
}
}
async _findLatestSnapshot(level, resource) {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot',
labels: {
[serviceToWarp10Label[level]]: resource,
},
start: 'now',
stop: -1,
}));
if (resp.result && (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
return result.v[0][0];
}
async _findOldestSnapshot(level, resource, beginTimestamp) {
let pos = beginTimestamp;
// eslint-disable-next-line no-constant-condition
while (true) {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot',
labels: {
[serviceToWarp10Label[level]]: resource,
},
start: pos - 1,
stop: -WARP10_SCAN_SIZE,
}));
if (resp.result && resp.result.length === 0) {
return pos;
}
const results = JSON.parse(resp.result[0]);
if (results.length === 0) {
return pos;
}
const { v: values } = results[0];
[pos] = values[values.length - 1];
}
}
async _migrateMetric(className, level, resource, metric) {
return this.withWarp10(async warp10 => warp10.ingest(
{
className,
labels: {
[serviceToWarp10Label[level]]: resource,
},
valueType: warp10RecordType,
},
[metric],
));
}
static _sumRecord(a, b) {
const objectDelta = (a.objectDelta || 0) + (b.objectDelta || 0);
const sizeDelta = (a.sizeDelta || 0) + (b.sizeDelta || 0);
const incomingBytes = (a.incomingBytes || 0) + (b.incomingBytes || 0);
const outgoingBytes = (a.outgoingBytes || 0) + (b.outgoingBytes || 0);
const operationKeys = new Set(Object.keys(a.operations || {}).concat(Object.keys(b.operations || {})));
// eslint-disable-next-line no-unused-vars
const operations = comprehend(Array.from(operationKeys), (_, key) => (
{
key,
value: ((a.operations || {})[key] || 0) + ((b.operations || {})[key] || 0),
}
));
return new UtapiRecord({
timestamp: b.timestamp || a.timestamp,
sizeDelta,
objectDelta,
incomingBytes,
outgoingBytes,
operations,
});
}
async _migrateResource(level, resource, ingest = true) {
logger.trace('migrating metrics for resource', { metricLevel: level, resource });
if (!ingest) {
logger.debug('ingestion is disabled, no records will be written', { metricLevel: level, resource });
}
const latestSnapshot = await this._findLatestSnapshot(level, resource);
const oldestSnapshot = latestSnapshot !== null
? await this._findOldestSnapshot(level, resource, latestSnapshot)
: null;
let correction = new UtapiRecord();
for await (const metric of this._iterMetrics(level, resource)) {
// Add metric to correction if it predates the latest snapshot
if (latestSnapshot !== null && metric.timestamp < latestSnapshot) {
correction = MigrateTask._sumRecord(correction, metric);
}
const _logger = logger.with({ metricLevel: level, resource, metricTimestamp: metric.timestamp });
if (ingest) {
let toIngest = new UtapiRecord(metric);
let className = 'utapi.checkpoint';
// Metric predates the oldest snapshot
if (oldestSnapshot !== null && metric.timestamp < oldestSnapshot) {
_logger.trace('ingesting metric as snapshot');
className = 'utapi.snapshot';
toIngest = new UtapiRecord({
...correction.getValue(),
timestamp: metric.timestamp,
});
// Metric in between oldest and latest snapshots
} else {
_logger.trace('ingesting metric as checkpoint');
}
await this._migrateMetric(className, level, resource, toIngest);
} else {
logger.trace('skipping ingestion of metric');
}
}
return correction;
}
async _migrateResourceLevel(level) {
const _logger = logger.with({ metricLevel: level });
_logger.debug('migrating metric level');
return async.eachLimit(this._iterResources(level), 5, async resource => {
let totals;
const migrated = await this._isMigrated(level, resource);
try {
totals = await this._migrateResource(level, resource, !migrated);
} catch (error) {
_logger.error('failed to migrate resource', { resource, error });
throw error;
}
if (!await this._markMigrated(level, resource)) {
const error = new Error('Failed to mark resource as migrated');
_logger.error('failed to migrate resource', { resource, error });
throw error;
}
const correction = new UtapiRecord({
...totals.getValue(),
timestamp: now(),
});
if (!await this._isCorrected(level, resource)) {
try {
_logger.debug('ingesting correction for metrics', { resource });
await this._migrateMetric('utapi.repair.event', level, resource, correction);
} catch (error) {
this._failedCorrections.push(correction);
_logger.error('error during correction', { resource, error });
throw errors.FailedMigration;
}
if (!await this._markCorrected(level, resource)) {
this._failedCorrections.push(correction);
const error = errors.FailedMigration.customizeDescription(
'Failed to mark resource as corrected,'
+ ' this can lead to inconsistencies if not manually corrected',
);
_logger.error('failed to migrate resource', { resource, error });
throw error;
}
} else {
_logger.trace('already marked as corrected, skipping correction', { resource });
}
});
}
async _getStatusKey(level, resource, stage) {
const key = `s3:migration:${level}:${resource}:${stage}`;
const res = await async.retry(3, async () => this._redis.call(redis => redis.get(key)));
return res === resource;
}
async _setStatusKey(level, resource, stage) {
const key = `s3:migration:${level}:${resource}:${stage}`;
try {
const res = await async.retry(3, async () => this._redis.call(redis => redis.set(key, resource)));
return res === 'OK';
} catch (error) {
logger.error('error setting migration status key', {
metricLevel: level, resource, key, error,
});
return false;
}
}
_isMigrated(level, resource) {
return this._getStatusKey(level, resource, 'migrated');
}
_markMigrated(level, resource) {
return this._setStatusKey(level, resource, 'migrated');
}
_isCorrected(level, resource) {
return this._getStatusKey(level, resource, 'corrected');
}
_markCorrected(level, resource) {
return this._setStatusKey(level, resource, 'corrected');
}
async _start() {
this._redis.connect();
return super._start();
}
async _join() {
await super._join();
await this._redis.disconnect();
}
async _execute() {
logger.debug('migrating account metrics to v2');
try {
await async.eachSeries(LEVELS_TO_MIGRATE, this._migrateResourceLevel.bind(this));
} catch (error) {
logger.error('migration failed with error', { error });
if (error.code === 1000) {
logger.info('This error is idempotent and migration can simply be restarted.');
} else if (error.code === 1001) {
logger.warn('This error requires manual correction before migration can be restarted.',
{ failedCorrections: this._failedCorrections });
}
}
}
}
module.exports = MigrateTask;

View File

@ -1,249 +0,0 @@
/* eslint-disable no-restricted-syntax */
const async = require('async');
const { mpuBucketPrefix } = require('arsenal').constants;
const BaseTask = require('./BaseTask');
const { UtapiRecord } = require('../models');
const config = require('../config');
const metadata = require('../metadata');
const { serviceToWarp10Label, warp10RecordType } = require('../constants');
const {
LoggerContext,
logEventFilter,
convertTimestamp,
buildFilterChain,
} = require('../utils');
const logger = new LoggerContext({
module: 'ReindexTask',
});
class ReindexTask extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.reindexPort,
...options,
});
this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0;
const eventFilters = (config && config.filter) || {};
this._shouldReindex = buildFilterChain((config && config.filter) || {});
// exponential backoff: max wait = 50 * 2 ^ 10 milliseconds ~= 51 seconds
this.ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => logger.info(...args), 'reindex resource filtering enabled', eventFilters);
}
}
async _setup(includeDefaultOpts = true) {
await super._setup(includeDefaultOpts);
this._program.option(
'--bucket <bucket>',
'Manually specify a bucket to reindex. Can be used multiple times.',
(bucket, previous) => previous.concat([bucket]),
[],
);
}
static async _indexBucket(bucket) {
let size = 0;
let count = 0;
let lastMaster = null;
let lastMasterSize = null;
for await (const obj of metadata.listObjects(bucket)) {
if (obj.value.isDeleteMarker || obj.value.isPHD) {
// eslint-disable-next-line no-continue
continue;
}
if (!Number.isInteger(obj.value['content-length'])) {
logger.debug('object missing content-length, not including in count');
// eslint-disable-next-line no-continue
continue;
}
count += 1;
size += obj.value['content-length'];
// If versioned, subtract the size of the master to avoid double counting
if (lastMaster && obj.name === lastMaster) {
logger.debug('Detected versioned key. subtracting master size', { lastMasterSize, key: obj.name });
size -= lastMasterSize;
count -= 1;
lastMaster = null;
lastMasterSize = null;
// Only save master versions
} else if (!obj.version) {
lastMaster = obj.name;
lastMasterSize = obj.value['content-length'];
}
}
return { size, count };
}
static async _indexMpuBucket(bucket) {
if (await metadata.bucketExists(bucket)) {
return ReindexTask._indexBucket(bucket);
}
return { size: 0, count: 0 };
}
async _fetchCurrentMetrics(level, resource) {
const timestamp = convertTimestamp(new Date().getTime());
const res = await this.withWarp10(warp10 => {
const options = {
params: {
end: timestamp,
node: warp10.nodeId,
labels: {
[level]: resource,
},
// eslint-disable-next-line camelcase
no_reindex: true,
},
macro: 'utapi/getMetricsAt',
};
return warp10.exec(options);
});
const [value] = res.result || [];
if (!value) {
throw new Error('unable to fetch current metrics from warp10');
}
if (!Number.isInteger(value.objD) || !Number.isInteger(value.sizeD)) {
logger.error('invalid values returned from warp 10', { response: res });
throw new Error('invalid values returned from warp 10');
}
return {
timestamp,
value,
};
}
async _updateMetric(level, resource, total) {
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
const objectDelta = total.count - value.objD;
const sizeDelta = total.size - value.sizeD;
if (objectDelta !== 0 || sizeDelta !== 0) {
logger.info('discrepancy detected in metrics. writing corrective record',
{ [level]: resource, objectDelta, sizeDelta });
const record = new UtapiRecord({
objectDelta,
sizeDelta,
timestamp,
});
await this.withWarp10(warp10 => warp10.ingest(
{
className: 'utapi.repair.reindex',
labels: {
[level]: resource,
},
valueType: warp10RecordType,
},
[record],
));
}
}
get targetBuckets() {
if (this._program.bucket.length) {
return this._program.bucket.map(name => ({ name }));
}
return metadata.listBuckets();
}
async _execute() {
logger.info('started reindex task');
const accountTotals = {};
const ignoredAccounts = new Set();
await async.eachLimit(this.targetBuckets, 5, async bucket => {
if (!this._shouldReindex({ bucket: bucket.name, account: bucket.account })) {
logger.debug('skipping excluded bucket', { bucket: bucket.name, account: bucket.account });
return;
}
logger.info('started bucket reindex', { bucket: bucket.name });
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
let bktTotal;
let mpuTotal;
try {
bktTotal = await async.retryable(this.ebConfig, ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(this.ebConfig, ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) {
logger.error(
'failed bucket reindex. any associated account will be skipped',
{ error, bucket: bucket.name },
);
// buckets passed with `--bucket` won't have an account property
if (bucket.account) {
ignoredAccounts.add(bucket.account);
}
return;
}
const total = {
size: bktTotal.size + mpuTotal.size,
count: bktTotal.count,
};
// buckets passed with `--bucket` won't have an account property
if (bucket.account) {
if (accountTotals[bucket.account]) {
accountTotals[bucket.account].size += total.size;
accountTotals[bucket.account].count += total.count;
} else {
accountTotals[bucket.account] = { ...total };
}
}
logger.info('finished bucket reindex', { bucket: bucket.name });
try {
await this._updateMetric(
serviceToWarp10Label.buckets,
bucket.name,
total,
);
} catch (error) {
logger.error('error updating metrics for bucket', { error, bucket: bucket.name });
}
});
const toUpdate = Object.entries(accountTotals)
.filter(([account]) => !ignoredAccounts.has(account));
await async.eachLimit(toUpdate, 5, async ([account, total]) => {
try {
await this._updateMetric(
serviceToWarp10Label.accounts,
account,
total,
);
} catch (error) {
logger.error('error updating metrics for account', { error, account });
}
});
logger.info('finished reindex task');
}
}
module.exports = ReindexTask;

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { LoggerContext } = require('../utils');
@ -11,67 +10,30 @@ const logger = new LoggerContext({
class RepairTask extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.repairPort,
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.repairSchedule;
this._defaultLag = repairLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_repair_task_created_total',
help: 'Total number of repair records created',
labelNames: ['origin', 'containerName'],
});
return {
created,
};
}
/**
* Metrics for RepairTask
* @typedef {Object} RepairMetrics
* @property {number} created - Number of repair records created
*/
/**
*
* @param {RepairMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _execute(timestamp) {
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
const status = await this.withWarp10(warp10 => {
const params = {
params: {
nodeId: warp10.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/repairRecords',
};
return warp10.exec(params);
});
const params = {
params: {
nodeId: this.nodeId,
end: timestamp.toString(),
fields: indexedEventFields,
},
macro: 'utapi/repairRecords',
};
const status = await this._warp10.exec(params);
if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`);
this._pushMetrics({ created: status.result[0] });
}
}
}

View File

@ -3,10 +3,6 @@ const IngestShard = require('./IngestShard');
const CreateCheckpoint = require('./CreateCheckpoint');
const CreateSnapshot = require('./CreateSnapshot');
const RepairTask = require('./Repair');
const ReindexTask = require('./Reindex');
const MigrateTask = require('./Migrate');
const MonitorDiskUsage = require('./DiskUsage');
const ManualAdjust = require('./ManualAdjust');
module.exports = {
IngestShard,
@ -14,8 +10,4 @@ module.exports = {
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
MigrateTask,
MonitorDiskUsage,
ManualAdjust,
};

View File

@ -1,53 +0,0 @@
const { promisify } = require('util');
const getFolderSize = require('get-folder-size');
const byteSize = require('byte-size');
const diskSpecRegex = /(\d+)([bkmgtpxz])(i?b)?/;
const suffixToExp = {
b: 0,
k: 1,
m: 2,
g: 3,
t: 4,
p: 5,
x: 6,
z: 7,
};
/**
* Converts a string specifying disk size into its value in bytes
* Supported formats:
* 1b/1B - Directly specify a byte size
* 1K/1MB/1GiB - Specify a number of bytes using IEC or common suffixes
*
* Suffixes are case insensitive.
* All suffixes are considered IEC standard with 1 kibibyte being 2^10 bytes.
*
* @param {String} spec - string for conversion
* @returns {Integer} - disk size in bytes
*/
function parseDiskSizeSpec(spec) {
const normalized = spec.toLowerCase();
if (!diskSpecRegex.test(normalized)) {
throw Error('Format does not match a known suffix');
}
const match = diskSpecRegex.exec(normalized);
const size = parseInt(match[1], 10);
const exponent = suffixToExp[match[2]];
return size * (1024 ** exponent);
}
function _formatFunc() {
return `${this.value}${this.unit}`;
}
function formatDiskSize(value) {
return byteSize(value, { units: 'iec', toStringFn: _formatFunc }).toString();
}
module.exports = {
parseDiskSizeSpec,
getFolderSize: promisify(getFolderSize),
formatDiskSize,
};

View File

@ -1,47 +0,0 @@
const assert = require('assert');
/**
* filterObject
*
* Constructs a function meant for filtering Objects by the value of a key
* Returned function returns a boolean with false meaning the object was present
* in the filter allowing the function to be passed directly to Array.filter etc.
*
* @param {string} key - Object key to inspect
* @param {Object} filter
* @param {Set} [filter.allow] - Set containing keys to include
* @param {Set} [filter.deny] - Set containing keys to not include
* @returns {function(Object): bool}
*/
function filterObject(obj, key, { allow, deny }) {
if (allow && deny) {
throw new Error('You can not define both an allow and a deny list.');
}
if (!allow && !deny) {
throw new Error('You must define either an allow or a deny list.');
}
if (allow) {
assert(allow instanceof Set);
return obj[key] === undefined || allow.has(obj[key]);
}
assert(deny instanceof Set);
return obj[key] === undefined || !deny.has(obj[key]);
}
/**
* buildFilterChain
*
* Constructs a function from a map of key names and allow/deny filters.
* The returned function returns a boolean with false meaning the object was present
* in one of the filters allowing the function to be passed directly to Array.filter etc.
*
* @param {Object<string, Object<string, Set>} filters
* @returns {function(Object): bool}
*/
function buildFilterChain(filters) {
return obj => Object.entries(filters).every(([key, filter]) => filterObject(obj, key, filter));
}
module.exports = { filterObject, buildFilterChain };

View File

@ -1,66 +0,0 @@
const { callbackify } = require('util');
/**
* Convenience function to handle "if no callback then return a promise" pattern
*
* @param {Function} asyncFunc - asyncFunction to call
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - returns a Promise if no callback is passed
*/
function asyncOrCallback(asyncFunc, callback) {
if (typeof callback === 'function') {
callbackify(asyncFunc)(callback);
return undefined;
}
return asyncFunc();
}
/**
*
* @param {Array|Object} data - data to reduce
* @param {function(string, [string])} func - Called with the index/key and value for each entry in the input
* Array or Object. Expected to return { key, value };
* @returns {Object} - Resulting object
*/
function comprehend(data, func) {
return Object.entries(data).reduce((prev, [key, value]) => {
const { key: _key, value: _value } = func(key, value);
prev[_key] = _value;
return prev;
}, {});
}
/**
* Calls func with items in sequence, advancing if an error is thrown.
* The result from the first successful call is returned.
*
* onError, if passed, is called on every error thrown by func;
*
* @param {Array} items - items to iterate
* @param {AsyncFunction} func - function to apply to each item
* @param {Function|undefined} onError - optional function called if an error is thrown
* @returns {*} -
*/
async function iterIfError(items, func, onError) {
let error;
// eslint-disable-next-line no-restricted-syntax
for (const item of items) {
try {
// eslint-disable-next-line no-await-in-loop
const resp = await func(item);
return resp;
} catch (_error) {
if (onError) {
onError(_error);
}
error = _error;
}
}
throw error || new Error('unable to complete request');
}
module.exports = {
asyncOrCallback,
comprehend,
iterIfError,
};

Some files were not shown because too many files have changed in this diff Show More