Compare commits

..

1 Commits

Author SHA1 Message Date
Taylor McKinnon 6628962b6b stash 2021-03-29 10:37:07 -07:00
122 changed files with 8285 additions and 5293 deletions

View File

@ -1,18 +1,8 @@
{ {
"extends": "scality", "extends": "scality",
"env": {
"es6": true
},
"parserOptions": {
"ecmaVersion": 9
},
"rules": { "rules": {
"no-underscore-dangle": "off", "no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off", "implicit-arrow-linebreak" : "off"
"import/extensions": 0,
"prefer-spread": 0,
"no-param-reassign": 0,
"array-callback-return": 0
}, },
"settings": { "settings": {
"import/resolver": { "import/resolver": {

View File

@ -1,87 +0,0 @@
# General support information
GitHub Issues are **reserved** for actionable bug reports (including
documentation inaccuracies), and feature requests.
**All questions** (regarding configuration, usecases, performance, community,
events, setup and usage recommendations, among other things) should be asked on
the **[Zenko Forum](http://forum.zenko.io/)**.
> Questions opened as GitHub issues will systematically be closed, and moved to
> the [Zenko Forum](http://forum.zenko.io/).
--------------------------------------------------------------------------------
## Avoiding duplicates
When reporting a new issue/requesting a feature, make sure that we do not have
any duplicates already open:
- search the issue list for this repository (use the search bar, select
"Issues" on the left pane after searching);
- if there is a duplicate, please do not open your issue, and add a comment
to the existing issue instead.
--------------------------------------------------------------------------------
## Bug report information
(delete this section (everything between the lines) if you're not reporting a
bug but requesting a feature)
### Description
Briefly describe the problem you are having in a few paragraphs.
### Steps to reproduce the issue
Please provide steps to reproduce, including full log output
### Actual result
Describe the results you received
### Expected result
Describe the results you expected
### Additional information
- Node.js version,
- Docker version,
- npm version,
- distribution/OS,
- optional: anything else you deem helpful to us.
--------------------------------------------------------------------------------
## Feature Request
(delete this section (everything between the lines) if you're not requesting
a feature but reporting a bug)
### Proposal
Describe the feature
### Current behavior
What currently happens
### Desired behavior
What you would like to happen
### Usecase
Please provide usecases for changing the current behavior
### Additional information
- Is this request for your company? Y/N
- If Y: Company name:
- Are you using any Scality Enterprise Edition products (RING, Zenko EE)? Y/N
- Are you willing to contribute this feature yourself?
- Position/Title:
- How did you hear about us?
--------------------------------------------------------------------------------

View File

@ -1,14 +0,0 @@
# Creating this image for the CI as GitHub Actions
# is unable to overwrite the entrypoint
ARG REDIS_IMAGE="redis:latest"
FROM ${REDIS_IMAGE}
ENV REDIS_LISTEN_PORT 6380
ENV REDIS_MASTER_HOST redis
ENV REDIS_MASTER_PORT_NUMBER 6379
ENTRYPOINT redis-server \
--port ${REDIS_LISTEN_PORT} \
--slaveof ${REDIS_MASTER_HOST} ${REDIS_MASTER_PORT_NUMBER}

View File

@ -1,7 +0,0 @@
FROM ghcr.io/scality/vault:c2607856
ENV VAULT_DB_BACKEND LEVELDB
RUN chmod 400 tests/utils/keyfile
ENTRYPOINT yarn start

View File

@ -1,65 +0,0 @@
name: build-ci-images
on:
workflow_call:
jobs:
warp10-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: warp10-ci
context: .
file: images/warp10/Dockerfile
lfs: true
redis-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-ci
context: .
file: images/redis/Dockerfile
redis-replica-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
needs:
- redis-ci
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-replica-ci
context: .github/docker/redis-replica
build-args: |
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
vault-ci:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to GitHub Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ github.token }}
- name: Build and push vault Image
uses: docker/build-push-action@v5
with:
push: true
context: .github/docker/vault
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
cache-from: type=gha,scope=vault
cache-to: type=gha,mode=max,scope=vault

View File

@ -1,16 +0,0 @@
name: build-dev-image
on:
push:
branches-ignore:
- 'development/**'
jobs:
build-dev:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}

View File

@ -1,39 +0,0 @@
name: release-warp10
on:
workflow_dispatch:
inputs:
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: true
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets: inherit
with:
name: warp10
context: .
file: images/warp10/Dockerfile
tag: ${{ github.event.inputs.tag }}
lfs: true
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release utapi/warp10:${{ github.event.inputs.tag }}-warp10
tag_name: ${{ github.event.inputs.tag }}-warp10
generate_release_notes: false
target_commitish: ${{ github.sha }}

View File

@ -1,45 +0,0 @@
name: release
on:
workflow_dispatch:
inputs:
dockerfile:
description: Dockerfile to build image from
type: choice
options:
- images/nodesvc-base/Dockerfile
- Dockerfile
required: true
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: false
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}
context: .
file: ${{ github.event.inputs.dockerfile}}
tag: ${{ github.event.inputs.tag }}
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
name: Release ${{ github.event.inputs.tag }}
tag_name: ${{ github.event.inputs.tag }}
generate_release_notes: true
target_commitish: ${{ github.sha }}

View File

@ -1,361 +0,0 @@
---
name: tests
on:
push:
branches-ignore:
- 'development/**'
workflow_dispatch:
inputs:
debug:
description: Debug (enable the ability to SSH to runners)
type: boolean
required: false
default: 'false'
connection-timeout-m:
type: number
required: false
description: Timeout for ssh connection to worker (minutes)
default: 30
jobs:
build-ci:
uses: ./.github/workflows/build-ci.yaml
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: run static analysis tools on markdown
run: yarn run lint_md
- name: run static analysis tools on code
run: yarn run lint
tests-v1:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
name: ${{ matrix.test.name }}
strategy:
fail-fast: false
matrix:
test:
- name: run unit tests
command: yarn test
env:
UTAPI_METRICS_ENABLED: 'true'
- name: run v1 client tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client
env: {}
- name: run v1 server tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:server
env: {}
- name: run v1 cron tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:cron
env: {}
- name: run v1 interval tests
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:interval
env: {}
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
ports:
- 4802:4802
- 8082:8082
- 9718:9718
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
tests-v2-with-vault:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 for 60 seconds
run: sleep 60
- name: run v2 functional tests
run: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_SERVICE_USER_ENABLED: 'true'
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
tests-v2-without-sensision:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
name: ${{ matrix.test.name }}
strategy:
fail-fast: false
matrix:
test:
- name: run v2 soft limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:softLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: run v2 hard limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:hardLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 a little bit
run: sleep 60
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}

View File

@ -1,31 +0,0 @@
FROM node:16.13.2-buster-slim
WORKDIR /usr/src/app
COPY package.json yarn.lock /usr/src/app/
RUN apt-get update \
&& apt-get install -y \
curl \
gnupg2
RUN curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list
RUN apt-get update \
&& apt-get install -y jq git python3 build-essential yarn --no-install-recommends \
&& yarn cache clean \
&& yarn install --frozen-lockfile --production --ignore-optional --network-concurrency=1 \
&& apt-get autoremove --purge -y python3 git build-essential \
&& rm -rf /var/lib/apt/lists/* \
&& yarn cache clean \
&& rm -rf ~/.node-gyp \
&& rm -rf /tmp/yarn-*
# Keep the .git directory in order to properly report version
COPY . /usr/src/app
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
CMD [ "yarn", "start" ]
EXPOSE 8100

View File

@ -3,8 +3,9 @@
![Utapi logo](res/utapi-logo.png) ![Utapi logo](res/utapi-logo.png)
[![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi) [![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi)
[![Scality CI][badgepriv]](http://ci.ironmann.io/gh/scality/utapi)
Service Utilization API for tracking resource usage and metrics reporting. Service Utilization API for tracking resource usage and metrics reporting
## Design ## Design
@ -87,13 +88,13 @@ Server is running.
1. Create an IAM user 1. Create an IAM user
``` ```
aws iam --endpoint-url <endpoint> create-user --user-name <user-name> aws iam --endpoint-url <endpoint> create-user --user-name utapiuser
``` ```
2. Create access key for the user 2. Create access key for the user
``` ```
aws iam --endpoint-url <endpoint> create-access-key --user-name <user-name> aws iam --endpoint-url <endpoint> create-access-key --user-name utapiuser
``` ```
3. Define a managed IAM policy 3. Define a managed IAM policy
@ -202,11 +203,12 @@ Server is running.
5. Attach user to the managed policy 5. Attach user to the managed policy
``` ```
aws --endpoint-url <endpoint> iam attach-user-policy --user-name aws --endpoint-url <endpoint> iam attach-user-policy --user-name utapiuser
<user-name> --policy-arn <policy arn> --policy-arn <policy arn>
``` ```
Now the user has access to ListMetrics request in Utapi on all buckets. Now the user `utapiuser` has access to ListMetrics request in Utapi on all
buckets.
### Signing request with Auth V4 ### Signing request with Auth V4
@ -222,18 +224,16 @@ following urls for reference.
You may also view examples making a request with Auth V4 using various languages You may also view examples making a request with Auth V4 using various languages
and AWS SDKs [here](/examples). and AWS SDKs [here](/examples).
Alternatively, you can use a nifty command line tool available in Scality's Alternatively, you can use a nifty command line tool available in Scality's S3.
CloudServer.
You can git clone the CloudServer repo from here You can git clone S3 repo from here https://github.com/scality/S3.git and follow
https://github.com/scality/cloudserver and follow the instructions in the README the instructions in README to install the dependencies.
to install the dependencies.
If you have CloudServer running inside a docker container you can docker exec If you have S3 running inside a docker container you can docker exec into the S3
into the CloudServer container as container as
``` ```
docker exec -it <container-id> bash docker exec -it <container id> bash
``` ```
and then run the command and then run the command
@ -271,7 +271,7 @@ Usage: list_metrics [options]
-v, --verbose -v, --verbose
``` ```
An example call to list metrics for a bucket `demo` to Utapi in a https enabled A typical call to list metrics for a bucket `demo` to Utapi in a https enabled
deployment would be deployment would be
``` ```
@ -283,7 +283,7 @@ Both start and end times are time expressed as UNIX epoch timestamps **expressed
in milliseconds**. in milliseconds**.
Keep in mind, since Utapi metrics are normalized to the nearest 15 min. Keep in mind, since Utapi metrics are normalized to the nearest 15 min.
interval, start time and end time need to be in the specific format as follows. interval, so start time and end time need to be in specific format as follows.
#### Start time #### Start time
@ -297,7 +297,7 @@ Date: Tue Oct 11 2016 17:35:25 GMT-0700 (PDT)
Unix timestamp (milliseconds): 1476232525320 Unix timestamp (milliseconds): 1476232525320
Here's an example JS method to get a start timestamp Here's a typical JS method to get start timestamp
```javascript ```javascript
function getStartTimestamp(t) { function getStartTimestamp(t) {
@ -317,7 +317,7 @@ seconds and milliseconds set to 59 and 999 respectively. So valid end timestamps
would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and
`09:59:59:999`. `09:59:59:999`.
Here's an example JS method to get an end timestamp Here's a typical JS method to get end timestamp
```javascript ```javascript
function getEndTimestamp(t) { function getEndTimestamp(t) {
@ -342,3 +342,4 @@ In order to contribute, please follow the
https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md). https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md).
[badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg [badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg
[badgepriv]: http://ci.ironmann.io/gh/scality/utapi.svg?style=svg

View File

@ -1,276 +0,0 @@
#! /usr/bin/env node
// TODO
// - deduplicate with Vault's seed script at https://github.com/scality/Vault/pull/1627
// - add permission boundaries to user when https://scality.atlassian.net/browse/VAULT-4 is implemented
const { errors } = require('arsenal');
const program = require('commander');
const werelogs = require('werelogs');
const async = require('async');
const { IAM } = require('aws-sdk');
const { version } = require('../package.json');
const systemPrefix = '/scality-internal/';
function generateUserPolicyDocument() {
return {
Version: '2012-10-17',
Statement: {
Effect: 'Allow',
Action: 'utapi:ListMetrics',
Resource: 'arn:scality:utapi:::*/*',
},
};
}
function createIAMClient(opts) {
return new IAM({
endpoint: opts.iamEndpoint,
});
}
function needsCreation(v) {
if (Array.isArray(v)) {
return !v.length;
}
return !v;
}
class BaseHandler {
constructor(serviceName, iamClient, log) {
this.serviceName = serviceName;
this.iamClient = iamClient;
this.log = log;
}
applyWaterfall(values, done) {
this.log.debug('applyWaterfall', { values, type: this.resourceType });
const v = values[this.resourceType];
if (needsCreation(v)) {
this.log.debug('creating', { v, type: this.resourceType });
return this.create(values)
.then(res =>
done(null, Object.assign(values, {
[this.resourceType]: res,
})))
.catch(done);
}
this.log.debug('conflicts check', { v, type: this.resourceType });
if (this.conflicts(v)) {
return done(errors.EntityAlreadyExists.customizeDescription(
`${this.resourceType} ${this.serviceName} already exists and conflicts with the expected value.`));
}
this.log.debug('nothing to do', { v, type: this.resourceType });
return done(null, values);
}
}
class UserHandler extends BaseHandler {
get resourceType() {
return 'user';
}
collect() {
return this.iamClient.getUser({
UserName: this.serviceName,
})
.promise()
.then(res => res.User);
}
create(allResources) {
return this.iamClient.createUser({
UserName: this.serviceName,
Path: systemPrefix,
})
.promise()
.then(res => res.User);
}
conflicts(u) {
return u.Path !== systemPrefix;
}
}
class PolicyHandler extends BaseHandler {
get resourceType() {
return 'policy';
}
collect() {
return this.iamClient.listPolicies({
MaxItems: 100,
OnlyAttached: false,
Scope: 'All',
})
.promise()
.then(res => res.Policies.find(p => p.PolicyName === this.serviceName));
}
create(allResources) {
const doc = generateUserPolicyDocument();
return this.iamClient.createPolicy({
PolicyName: this.serviceName,
PolicyDocument: JSON.stringify(doc),
Path: systemPrefix,
})
.promise()
.then(res => res.Policy);
}
conflicts(p) {
return p.Path !== systemPrefix;
}
}
class PolicyAttachmentHandler extends BaseHandler {
get resourceType() {
return 'policyAttachment';
}
collect() {
return this.iamClient.listAttachedUserPolicies({
UserName: this.serviceName,
MaxItems: 100,
})
.promise()
.then(res => res.AttachedPolicies)
}
create(allResources) {
return this.iamClient.attachUserPolicy({
PolicyArn: allResources.policy.Arn,
UserName: this.serviceName,
})
.promise();
}
conflicts(p) {
return false;
}
}
class AccessKeyHandler extends BaseHandler {
get resourceType() {
return 'accessKey';
}
collect() {
return this.iamClient.listAccessKeys({
UserName: this.serviceName,
MaxItems: 100,
})
.promise()
.then(res => res.AccessKeyMetadata)
}
create(allResources) {
return this.iamClient.createAccessKey({
UserName: this.serviceName,
})
.promise()
.then(res => res.AccessKey);
}
conflicts(a) {
return false;
}
}
function collectResource(v, done) {
v.collect()
.then(res => done(null, res))
.catch(err => {
if (err.code === 'NoSuchEntity') {
return done(null, null);
}
done(err);
});
}
function collectResourcesFromHandlers(handlers, cb) {
const tasks = handlers.reduce((acc, v) => ({
[v.resourceType]: done => collectResource(v, done),
...acc,
}), {});
async.parallel(tasks, cb);
}
function buildServiceUserHandlers(serviceName, client, log) {
return [
UserHandler,
PolicyHandler,
PolicyAttachmentHandler,
AccessKeyHandler,
].map(h => new h(serviceName, client, log));
}
function apply(client, serviceName, log, cb) {
const handlers = buildServiceUserHandlers(serviceName, client, log);
async.waterfall([
done => collectResourcesFromHandlers(handlers, done),
...handlers.map(h => h.applyWaterfall.bind(h)),
(values, done) => done(null, values.accessKey),
], cb);
}
function wrapAction(actionFunc, serviceName, options) {
werelogs.configure({
level: options.logLevel,
dump: options.logDumpLevel,
});
const log = new werelogs.Logger(process.argv[1]).newRequestLogger();
const client = createIAMClient(options);
actionFunc(client, serviceName, log, (err, data) => {
if (err) {
log.error('failed', {
data,
error: err,
});
if (err.EntityAlreadyExists) {
log.error(`run "${process.argv[1]} purge ${serviceName}" to fix.`);
}
process.exit(1);
}
log.info('success', { data });
process.exit();
});
}
program.version(version);
[
{
name: 'apply <service-name>',
actionFunc: apply,
},
].forEach(cmd => {
program
.command(cmd.name)
.option('--iam-endpoint <url>', 'IAM endpoint', 'http://localhost:8600')
.option('--log-level <level>', 'log level', 'info')
.option('--log-dump-level <level>', 'log level that triggers a dump of the debug buffer', 'error')
.action(wrapAction.bind(null, cmd.actionFunc));
});
const validCommands = program.commands.map(n => n._name);
// Is the command given invalid or are there too few arguments passed
if (!validCommands.includes(process.argv[2])) {
program.outputHelp();
process.stdout.write('\n');
process.exit(1);
} else {
program.parse(process.argv);
}

25
bin/utapi.js Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env node
const Process = require('../libV2/process');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({ module: 'entrypoint' });
const utapiProcess = new Process();
utapiProcess.setup().then(
() => {
logger.info('Utapi setup completed, starting...');
return utapiProcess.start();
},
async error => {
logger.error(`Utapi encountered an unexpected error during setup ${ error.message }`, );
await utapiProcess.join(1);
},
).then(
() => logger.info('Utapi started'),
async error => {
logger.error(`Utapi encountered an error during startup: ${error.message}`);
await utapiProcess.join(1);
},
);

View File

@ -1,75 +0,0 @@
version: '3.8'
x-models:
warp10: &warp10
build:
context: .
dockerfile: ./images/warp10/Dockerfile
volumes: [ $PWD/warpscript:/usr/local/share/warpscript ]
warp10_env: &warp10_env
ENABLE_WARPSTUDIO: 'true'
ENABLE_SENSISION: 'true'
warpscript.repository.refresh: 1000
warpscript.maxops: 1000000000
warpscript.maxops.hard: 1000000000
warpscript.maxfetch: 1000000000
warpscript.maxfetch.hard: 1000000000
warpscript.extension.debug: io.warp10.script.ext.debug.DebugWarpScriptExtension
warpscript.maxrecursion: 1000
warpscript.repository.directory: /usr/local/share/warpscript
warpscript.extension.logEvent: io.warp10.script.ext.logging.LoggingWarpScriptExtension
redis: &redis
build:
context: .
dockerfile: ./images/redis/Dockerfile
services:
redis-0:
image: redis:7.2.4
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6379:6379
environment:
- HOST_IP="${EXTERNAL_HOST}"
redis-1:
image: redis:7.2.4
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6380:6380
environment:
- HOST_IP="${EXTERNAL_HOST}"
redis-sentinel-0:
image: redis:7.2.4
command: |-
bash -c 'cat > /tmp/sentinel.conf <<EOF
port 16379
logfile ""
dir /tmp
sentinel announce-ip ${EXTERNAL_HOST}
sentinel announce-port 16379
sentinel monitor scality-s3 "${EXTERNAL_HOST}" 6379 1
EOF
redis-sentinel /tmp/sentinel.conf'
environment:
- HOST_IP="${EXTERNAL_HOST}"
ports:
- 16379:16379
warp10:
<< : *warp10
environment:
<< : *warp10_env
ports:
- 4802:4802
- 8081:8081
- 9718:9718
volumes:
- /tmp/warp10:/data
- '${PWD}/warpscript:/usr/local/share/warpscript'

View File

@ -1,47 +0,0 @@
#!/bin/bash
# set -e stops the execution of a script if a command or pipeline has an error
set -e
# modifying config.json
JQ_FILTERS_CONFIG="."
if [[ "$LOG_LEVEL" ]]; then
if [[ "$LOG_LEVEL" == "info" || "$LOG_LEVEL" == "debug" || "$LOG_LEVEL" == "trace" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .log.logLevel=\"$LOG_LEVEL\""
echo "Log level has been modified to $LOG_LEVEL"
else
echo "The log level you provided is incorrect (info/debug/trace)"
fi
fi
if [[ "$WORKERS" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .workers=\"$WORKERS\""
fi
if [[ "$REDIS_HOST" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.host=\"$REDIS_HOST\""
fi
if [[ "$REDIS_PORT" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.port=\"$REDIS_PORT\""
fi
if [[ "$VAULTD_HOST" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.host=\"$VAULTD_HOST\""
fi
if [[ "$VAULTD_PORT" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.port=\"$VAULTD_PORT\""
fi
if [[ "$HEALTHCHECKS_ALLOWFROM" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .healthChecks.allowFrom=[\"$HEALTHCHECKS_ALLOWFROM\"]"
fi
if [[ $JQ_FILTERS_CONFIG != "." ]]; then
jq "$JQ_FILTERS_CONFIG" config.json > config.json.tmp
mv config.json.tmp config.json
fi
exec "$@"

View File

@ -1,42 +0,0 @@
# Utapi Release Plan
## Docker Image Generation
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
Utapi has one namespace there:
* Namespace: ghcr.io/scality/utapi
With every CI build, the CI will push images, tagging the
content with the developer branch's short SHA-1 commit hash.
This allows those images to be used by developers, CI builds,
build chain and so on.
Tagged versions of utapi will be stored in the production namespace.
## How to Pull Docker Images
```sh
docker pull ghcr.io/scality/utapi:<commit hash>
docker pull ghcr.io/scality/utapi:<tag>
```
## Release Process
To release a production image:
* Name the tag for the repository and Docker image.
* Use the `yarn version` command with the same tag to update `package.json`.
* Create a PR and merge the `package.json` change.
* Tag the repository using the same tag.
* [Force a build] using:
* A given branch that ideally matches the tag.
* The `release` stage.
* An extra property with the name `tag` and its value being the actual tag.
[Force a build]:
https://eve.devsca.com/github/scality/utapi/#/builders/bootstrap/force/force

181
eve/main.yml Normal file
View File

@ -0,0 +1,181 @@
---
version: 0.2
branches:
default:
stage: pre-merge
models:
- Git: &clone
name: Pull repo
repourl: '%(prop:git_reference)s'
shallow: True
retryFetch: True
haltOnFailure: True
- Workspace: &workspace
type: kube_pod
path: eve/workers/pod.yml
images:
aggressor:
context: '.'
dockerfile: eve/workers/unit_and_feature_tests/Dockerfile
warp10:
context: '.'
dockerfile: 'images/warp10/Dockerfile'
vault: eve/workers/mocks/vault
- Upload: &upload_artifacts
source: /artifacts
urls:
- "*"
stages:
pre-merge:
worker:
type: local
steps:
- TriggerStages:
name: trigger all the tests
stage_names:
- linting-coverage
- run-unit-tests
- run-client-tests
- run-server-tests
- run-cron-tests
- run-interval-tests
- run-v2-functional-tests
- run-v2-limit-tests
linting-coverage:
worker: *workspace
steps:
- Git: *clone
- ShellCommand:
name: run static analysis tools on markdown
command: yarn run lint_md
- ShellCommand:
name: run static analysis tools on code
command: yarn run lint
run-unit-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand:
name: run unit tests
command: yarn test
run-client-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand:
name: run client tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:client.log"
follow: true
run-server-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand:
name: run server tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:server.log"
follow: true
run-cron-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand:
name: run cron tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:cron.log"
follow: true
run-interval-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand:
name: run interval tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:interval.log"
follow: true
run-v2-functional-tests:
worker:
<< : *workspace
vars:
vault: enabled
steps:
- Git: *clone
- ShellCommand:
name: Wait for Warp 10
command: |
bash -c "
set -ex
bash tests/utils/wait_for_local_port.bash 4802 60"
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
- ShellCommand:
name: run v2 functional tests
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
utapi:
filename: "/artifacts/setup_ft_test:v2.log"
follow: true
run-v2-limit-tests:
worker:
<< : *workspace
vars:
vault: enabled
steps:
- Git: *clone
- ShellCommand:
name: Wait for Warp 10
command: |
bash -c "
set -ex
bash tests/utils/wait_for_local_port.bash 4802 60"
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
- ShellCommand:
name: run v2 soft limit tests
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:softLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
utapi:
filename: "/artifacts/setup_ft_test:softLimit.log"
- ShellCommand:
name: run v2 hard limit tests
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:hardLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
logfiles:
warp10:
filename: "/artifacts/warp10.log"
follow: true
utapi:
filename: "/artifacts/setup_ft_test:hardLimit.log"
follow: true
- Upload: *upload_artifacts

View File

@ -0,0 +1,7 @@
FROM node:alpine
ADD ./vault.js /usr/share/src/
WORKDIR /usr/share/src/
CMD node vault.js

View File

@ -0,0 +1,66 @@
const http = require('http');
const url = require('url');
const port = process.env.VAULT_PORT || 8500;
const unauthResp = {
ErrorResponse: {
$: {
xmlns: 'https://iam.amazonaws.com/doc/2010-05-08/',
},
Error: {
Code: 'InvalidAccessKeyId',
Message: 'The AWS access key Id you provided does not exist in our records.',
},
RequestId: '97f22e2dba45bca2a5cd:fb375c22ed4ea7500691',
},
};
class Vault {
constructor() {
this._server = null;
}
static _onRequest(req, res) {
const { query } = url.parse(req.url, true);
if (query.accessKey === 'invalidKey') {
res.writeHead(403);
res.write(JSON.stringify(unauthResp));
} else if (query.Action === 'AccountsCanonicalIds') {
res.writeHead(200);
let body;
if (Array.isArray(query.accountIds)) {
body = query.accountIds.map(id => ({
accountId: id,
canonicalId: id.split(':')[1],
}));
} else {
body = [{
accountId: query.accountIds,
canonicalId: query.accountIds.split(':')[1],
}];
}
res.write(JSON.stringify(body));
}
return res.end();
}
start() {
this._server = http.createServer(Vault._onRequest).listen(port);
}
end() {
this._server.close();
}
}
const vault = new Vault();
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
process.on(eventName, () => process.exit(0));
});
// eslint-disable-next-line no-console
console.log('Starting Vault Mock...');
vault.start();

65
eve/workers/pod.yml Normal file
View File

@ -0,0 +1,65 @@
apiVersion: v1
kind: Pod
metadata:
name: "utapi-test-pod"
spec:
activeDeadlineSeconds: 3600
restartPolicy: Never
terminationGracePeriodSeconds: 10
containers:
- name: aggressor
image: "{{ images.aggressor }}"
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 500m
memory: 3Gi
limits:
cpu: "2"
memory: 3Gi
volumeMounts:
- mountPath: /var/run/docker.sock
name: docker-socket
- name: artifacts
readOnly: false
mountPath: /artifacts
- name: warp10
image: "{{ images.warp10 }}"
command:
- sh
- -ce
- /init | tee -a /artifacts/warp10.log
env:
- name: standalone.port
value: '4802'
- name: warpscript.maxops
value: '10000000'
resources:
requests:
cpu: 500m
memory: 1Gi
limits:
cpu: 1750m
memory: 3Gi
volumeMounts:
- name: artifacts
readOnly: false
mountPath: /artifacts
{% if vars.vault is defined and vars.vault == 'enabled' %}
- name: vault
image: "{{ images.vault }}"
resources:
requests:
cpu: 10m
memory: 64Mi
limits:
cpu: 50m
memory: 128Mi
{% endif %}
volumes:
- name: docker-socket
hostPath:
path: /var/run/docker.sock
type: Socket
- name: artifacts
emptyDir: {}

View File

@ -0,0 +1,48 @@
FROM buildpack-deps:jessie-curl
#
# Install apt packages needed by utapi and buildbot_worker
#
ENV LANG C.UTF-8
ENV NODE_VERSION 10.22.0
ENV PATH=$PATH:/utapi/node_modules/.bin
ENV NODE_PATH=/utapi/node_modules
COPY eve/workers/unit_and_feature_tests/utapi_packages.list eve/workers/unit_and_feature_tests/buildbot_worker_packages.list /tmp/
WORKDIR /utapi
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
&& apt-get update -qq \
&& cat /tmp/*packages.list | xargs apt-get install -y \
&& pip install pip==9.0.1 \
&& rm -rf /var/lib/apt/lists/* \
&& rm -f /tmp/*packages.list \
&& rm -f /etc/supervisor/conf.d/*.conf \
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
#
# Install yarn dependencies
#
COPY package.json yarn.lock /utapi/
RUN yarn cache clean \
&& yarn install --frozen-lockfile \
&& yarn cache clean
#
# Run buildbot-worker on startup through supervisor
#
ARG BUILDBOT_VERSION
RUN pip install buildbot-worker==$BUILDBOT_VERSION
RUN pip3 install requests
RUN pip3 install redis
ADD eve/workers/unit_and_feature_tests/supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
ADD eve/workers/unit_and_feature_tests/redis/sentinel.conf /etc/sentinel.conf
CMD ["supervisord", "-n"]

View File

@ -0,0 +1,11 @@
ca-certificates
git
libffi-dev
libssl-dev
python2.7
python2.7-dev
python-pip
sudo
supervisor
lsof
netcat

View File

@ -0,0 +1,35 @@
# Example sentinel.conf
# The port that this sentinel instance will run on
port 16379
# Specify the log file name. Also the empty string can be used to force
# Sentinel to log on the standard output. Note that if you use standard
# output for logging but daemonize, logs will be sent to /dev/null
logfile ""
# dir <working-directory>
# Every long running process should have a well-defined working directory.
# For Redis Sentinel to chdir to /tmp at startup is the simplest thing
# for the process to don't interfere with administrative tasks such as
# unmounting filesystems.
dir /tmp
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
#
# Tells Sentinel to monitor this master, and to consider it in O_DOWN
# (Objectively Down) state only if at least <quorum> sentinels agree.
#
# Note that whatever is the ODOWN quorum, a Sentinel will require to
# be elected by the majority of the known Sentinels in order to
# start a failover, so no failover can be performed in minority.
#
# Replicas are auto-discovered, so you don't need to specify replicas in
# any way. Sentinel itself will rewrite this configuration file adding
# the replicas using additional configuration options.
# Also note that the configuration file is rewritten when a
# replica is promoted to master.
#
# Note: master name should not include special characters or spaces.
# The valid charset is A-z 0-9 and the three characters ".-_".
sentinel monitor scality-s3 127.0.0.1 6379 1

View File

@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
SETUP_CMD="start" SETUP_CMD="start"
fi fi
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "setup_$2.log" & UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "/artifacts/setup_$2.log" &
bash tests/utils/wait_for_local_port.bash $PORT 40 bash tests/utils/wait_for_local_port.bash $PORT 40
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "test_$2.log" UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"

View File

@ -0,0 +1,14 @@
[program:buildbot_worker]
command=/bin/sh -c 'buildbot-worker create-worker . "%(ENV_BUILDMASTER)s:%(ENV_BUILDMASTER_PORT)s" "%(ENV_WORKERNAME)s" "%(ENV_WORKERPASS)s" && buildbot-worker start --nodaemon'
autostart=true
autorestart=false
[program:redis_server]
command=/usr/bin/redis-server
autostart=true
autorestart=false
[program:redis_sentinel]
command=/usr/bin/redis-server /etc/sentinel.conf --sentinel
autostart=true
autorestart=false

View File

@ -0,0 +1,5 @@
build-essential
redis-server
python3
python3-pip
yarn

View File

@ -1,90 +0,0 @@
import sys, os, base64, datetime, hashlib, hmac, datetime, calendar, json
import requests # pip install requests
access_key = '9EQTVVVCLSSG6QBMNKO5'
secret_key = 'T5mK/skkkwJ/mTjXZnHyZ5UzgGIN=k9nl4dyTmDH'
method = 'POST'
service = 's3'
host = 'localhost:8100'
region = 'us-east-1'
canonical_uri = '/buckets'
canonical_querystring = 'Action=ListMetrics&Version=20160815'
content_type = 'application/x-amz-json-1.0'
algorithm = 'AWS4-HMAC-SHA256'
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d')
# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def get_start_time(t):
start = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
return calendar.timegm(start.utctimetuple()) * 1000;
def get_end_time(t):
end = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
return calendar.timegm(end.utctimetuple()) * 1000 - 1;
start_time = get_start_time(datetime.datetime(2016, 1, 1, 0, 0, 0, 0))
end_time = get_end_time(datetime.datetime(2016, 2, 1, 0, 0, 0, 0))
# Request parameters for listing Utapi bucket metrics--passed in a JSON block.
bucketListing = {
'buckets': [ 'utapi-test' ],
'timeRange': [ start_time, end_time ],
}
request_parameters = json.dumps(bucketListing)
payload_hash = hashlib.sha256(request_parameters).hexdigest()
canonical_headers = \
'content-type:{0}\nhost:{1}\nx-amz-content-sha256:{2}\nx-amz-date:{3}\n' \
.format(content_type, host, payload_hash, amz_date)
signed_headers = 'content-type;host;x-amz-content-sha256;x-amz-date'
canonical_request = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}' \
.format(method, canonical_uri, canonical_querystring, canonical_headers,
signed_headers, payload_hash)
credential_scope = '{0}/{1}/{2}/aws4_request' \
.format(date_stamp, region, service)
string_to_sign = '{0}\n{1}\n{2}\n{3}' \
.format(algorithm, amz_date, credential_scope,
hashlib.sha256(canonical_request).hexdigest())
signing_key = getSignatureKey(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
hashlib.sha256).hexdigest()
authorization_header = \
'{0} Credential={1}/{2}, SignedHeaders={3}, Signature={4}' \
.format(algorithm, access_key, credential_scope, signed_headers, signature)
# The 'host' header is added automatically by the Python 'requests' library.
headers = {
'Content-Type': content_type,
'X-Amz-Content-Sha256': payload_hash,
'X-Amz-Date': amz_date,
'Authorization': authorization_header
}
endpoint = 'http://' + host + canonical_uri + '?' + canonical_querystring;
r = requests.post(endpoint, data=request_parameters, headers=headers)
print (r.text)

View File

@ -1,20 +0,0 @@
FROM ghcr.io/scality/federation/nodesvc-base:7.10.5.0
ENV UTAPI_CONFIG_FILE=${CONF_DIR}/config.json
WORKDIR ${HOME_DIR}/utapi
COPY ./package.json ./yarn.lock ${HOME_DIR}/utapi
# Remove when gitcache is sorted out
RUN rm /root/.gitconfig
RUN yarn install --production --frozen-lockfile --network-concurrency 1
COPY . ${HOME_DIR}/utapi
RUN chown -R ${USER} ${HOME_DIR}/utapi
USER ${USER}
CMD bash -c "source ${CONF_DIR}/env && export && supervisord -c ${CONF_DIR}/${SUPERVISORD_CONF}"

View File

@ -1,17 +0,0 @@
FROM redis:alpine
ENV S6_VERSION 2.0.0.1
ENV EXPORTER_VERSION 1.24.0
ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
&& tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \
&& rm -rf /tmp/s6-overlay-amd64.tar.gz
RUN wget https://github.com/oliver006/redis_exporter/releases/download/v${EXPORTER_VERSION}/redis_exporter-v${EXPORTER_VERSION}.linux-amd64.tar.gz -O redis_exporter.tar.gz \
&& tar xzf redis_exporter.tar.gz -C / \
&& cd .. \
&& mv /redis_exporter-v${EXPORTER_VERSION}.linux-amd64/redis_exporter /usr/local/bin/redis_exporter
ADD ./images/redis/s6 /etc
CMD /init

View File

@ -1,4 +0,0 @@
#!/usr/bin/with-contenv sh
echo "starting redis exporter"
exec redis_exporter

View File

@ -1,4 +0,0 @@
#!/usr/bin/with-contenv sh
echo "starting redis"
exec redis-server

View File

@ -1,2 +0,0 @@
standalone.host = 0.0.0.0
standalone.port = 4802

View File

@ -1,32 +1,14 @@
FROM golang:1.14-alpine as builder FROM warp10io/warp10:2.7.5
ENV WARP10_EXPORTER_VERSION 2.7.5
RUN apk add zip unzip build-base \
&& wget -q -O exporter.zip https://github.com/centreon/warp10-sensision-exporter/archive/refs/heads/master.zip \
&& unzip exporter.zip \
&& cd warp10-sensision-exporter-master \
&& go mod download \
&& cd tools \
&& go run generate_sensision_metrics.go ${WARP10_EXPORTER_VERSION} \
&& cp sensision.go ../collector/ \
&& cd .. \
&& go build -a -o /usr/local/go/warp10_sensision_exporter
FROM ghcr.io/scality/utapi/warp10:2.8.1-95-g73e7de80
# Override baked in version
# Remove when updating to a numbered release
ENV WARP10_VERSION 2.8.1-95-g73e7de80
ENV S6_VERSION 2.0.0.1 ENV S6_VERSION 2.0.0.1
ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2 ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2
ENV WARP10_CONF_TEMPLATES ${WARP10_HOME}/conf.templates/standalone ENV WARP10_CONF_TEMPLATES ${WARP10_HOME}/conf.templates/standalone
ENV SENSISION_DATA_DIR /data/sensision ENV SENSISION_DATA_DIR /data/sensision
ENV SENSISION_PORT 8082
# Modify Warp 10 default config # Modify Warp 10 default config
ENV standalone.host 0.0.0.0
ENV standalone.port 4802
ENV standalone.home /opt/warp10 ENV standalone.home /opt/warp10
ENV warpscript.repository.directory /usr/local/share/warpscript ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warp.token.file /static.tokens ENV warp.token.file /static.tokens
@ -38,19 +20,10 @@ RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_V
&& tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \ && tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \
&& rm -rf /tmp/s6-overlay-amd64.tar.gz && rm -rf /tmp/s6-overlay-amd64.tar.gz
# Install jmx exporter
ADD https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar /opt/jmx_prom_agent.jar
ADD ./images/warp10/jmx_prom.yaml /opt/jmx_prom.yaml
# Install protobuf extestion # Install protobuf extestion
ADD ./images/warp10/warp10-ext-protobuf-1.2.2-uberjar.jar /opt/warp10/lib/ ADD https://dl.bintray.com/senx/maven/io/warp10/warp10-ext-protobuf/1.1.0-uberjar/warp10-ext-protobuf-1.1.0-uberjar.jar /opt/warp10/lib/
# Install Sensision exporter
COPY --from=builder /usr/local/go/warp10_sensision_exporter /usr/local/bin/warp10_sensision_exporter
ADD ./images/warp10/s6 /etc ADD ./images/warp10/s6 /etc
ADD ./warpscript /usr/local/share/warpscript ADD ./warpscript /usr/local/share/warpscript
ADD ./images/warp10/static.tokens / ADD ./images/warp10/static.tokens /
ADD ./images/warp10/90-default-host-port.conf $WARP10_CONF_TEMPLATES/90-default-host-port.conf
CMD /init CMD /init

View File

@ -1,2 +0,0 @@
---
startDelaySeconds: 0

View File

@ -1,12 +0,0 @@
#!/usr/bin/with-contenv sh
EXPORTER_CMD="warp10_sensision_exporter --warp10.url=http://localhost:${SENSISION_PORT}/metrics"
if [ -f "/usr/local/bin/warp10_sensision_exporter" -a -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision exporter with $EXPORTER_CMD ..."
exec $EXPORTER_CMD
else
echo "Sensision is disabled. Not starting exporter."
# wait indefinitely
exec tail -f /dev/null
fi

View File

@ -3,8 +3,9 @@
JAVA="/usr/bin/java" JAVA="/usr/bin/java"
JAVA_OPTS="" JAVA_OPTS=""
VERSION=1.0.21
SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${SENSISION_VERSION}.jar SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${VERSION}.jar
SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR} SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR}
SENSISION_CLASS=io.warp10.sensision.Main SENSISION_CLASS=io.warp10.sensision.Main
export MALLOC_ARENA_MAX=1 export MALLOC_ARENA_MAX=1
@ -13,13 +14,13 @@ if [ -z "$SENSISION_HEAP" ]; then
SENSISION_HEAP=64m SENSISION_HEAP=64m
fi fi
SENSISION_CMD="${JAVA} ${JAVA_OPTS} -Xmx${SENSISION_HEAP} -Dsensision.server.port=${SENSISION_PORT} ${SENSISION_OPTS} -Dsensision.config=${SENSISION_CONFIG} -cp ${SENSISION_CP} ${SENSISION_CLASS}" SENSISION_CMD="${JAVA} ${JAVA_OPTS} -Xmx${SENSISION_HEAP} -Dsensision.server.port=0 ${SENSISION_OPTS} -Dsensision.config=${SENSISION_CONFIG} -cp ${SENSISION_CP} ${SENSISION_CLASS}"
if [ -n "$ENABLE_SENSISION" ]; then if [ -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision with $SENSISION_CMD ..." echo "Starting Sensision with $SENSISION_CMD ..."
exec $SENSISION_CMD | tee -a ${SENSISION_HOME}/logs/sensision.log exec $SENSISION_CMD | tee -a ${SENSISION_HOME}/logs/sensision.log
else else
echo "Sensision is disabled. Not starting." echo "Sensision is disabled"
# wait indefinitely # wait indefinitely
exec tail -f /dev/null exec tail -f /dev/null
fi fi

View File

@ -8,7 +8,7 @@ WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CLASS=io.warp10.standalone.Warp WARP10_CLASS=io.warp10.standalone.Warp
WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*" WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*"
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf" WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path '*/.*' -name '*.conf' | sort | tr '\n' ' ' 2> /dev/null)" CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path "*/\.*" -name "*.conf" | sort | tr '\n' ' ' 2> /dev/null)"
LOG4J_CONF=${WARP10_HOME}/etc/log4j.properties LOG4J_CONF=${WARP10_HOME}/etc/log4j.properties
if [ -z "$WARP10_HEAP" ]; then if [ -z "$WARP10_HEAP" ]; then
@ -19,7 +19,7 @@ if [ -z "$WARP10_HEAP_MAX" ]; then
WARP10_HEAP_MAX=4g WARP10_HEAP_MAX=4g
fi fi
JAVA_OPTS="-Dlog4j.configuration=file:${LOG4J_CONF} ${JAVA__EXTRA_OPTS} -Djava.awt.headless=true -Xms${WARP10_HEAP} -Xmx${WARP10_HEAP_MAX} -XX:+UseG1GC" JAVA_OPTS="-Djava.awt.headless=true -Xms${WARP10_HEAP} -Xmx${WARP10_HEAP_MAX} -XX:+UseG1GC ${JAVA_OPTS}"
SENSISION_OPTS= SENSISION_OPTS=
if [ -n "$ENABLE_SENSISION" ]; then if [ -n "$ENABLE_SENSISION" ]; then
@ -28,16 +28,10 @@ if [ -n "$ENABLE_SENSISION" ]; then
if [ -n "$SENSISION_LABELS" ]; then if [ -n "$SENSISION_LABELS" ]; then
_SENSISION_LABELS="-Dsensision.default.labels=$SENSISION_LABELS" _SENSISION_LABELS="-Dsensision.default.labels=$SENSISION_LABELS"
fi fi
SENSISION_OPTS="${_SENSISION_LABELS} -Dsensision.events.dir=/var/run/sensision/metrics -Dfile.encoding=UTF-8 ${SENSISION_EXTRA_OPTS}" SENSISION_OPTS="-Dsensision.server.port=0 ${_SENSISION_LABELS} -Dsensision.events.dir=/var/run/sensision/metrics -Dfile.encoding=UTF-8"
fi fi
JMX_EXPORTER_OPTS= WARP10_CMD="${JAVA} -Dlog4j.configuration=file:${LOG4J_CONF} ${JAVA_OPTS} ${SENSISION_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
if [ -n "$ENABLE_JMX_EXPORTER" ]; then
JMX_EXPORTER_OPTS="-javaagent:/opt/jmx_prom_agent.jar=4803:/opt/jmx_prom.yaml ${JMX_EXPORTER_EXTRA_OPTS}"
echo "Starting jmx exporter with Warp 10."
fi
WARP10_CMD="${JAVA} ${JMX_EXPORTER_OPTS} ${JAVA_OPTS} ${SENSISION_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
echo "Starting Warp 10 with $WARP10_CMD ..." echo "Starting Warp 10 with $WARP10_CMD ..."
exec $WARP10_CMD | tee -a ${WARP10_HOME}/logs/warp10.log exec $WARP10_CMD | tee -a ${WARP10_HOME}/logs/warp10.log

View File

@ -1,4 +1,5 @@
/* eslint-disable global-require */ /* eslint-disable global-require */
// eslint-disable-line strict // eslint-disable-line strict
let toExport; let toExport;

View File

@ -1,13 +1,35 @@
/* eslint-disable no-bitwise */ /* eslint-disable no-bitwise */
const assert = require('assert'); const assert = require('assert');
const fs = require('fs'); const fs = require('fs');
const path = require('path');
/** /**
* Reads from a config file and returns the content as a config object * Reads from a config file and returns the content as a config object
*/ */
class Config { class Config {
constructor(config) { constructor() {
this.component = config.component; /*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
this._basePath = path.resolve(__dirname, '..');
this.path = `${this._basePath}/config.json`;
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
this.path = process.env.UTAPI_CONFIG_FILE;
}
// Read config automatically
this._getConfig();
}
_getConfig() {
let config;
try {
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
config = JSON.parse(data);
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
this.port = 9500; this.port = 9500;
if (config.port !== undefined) { if (config.port !== undefined) {
@ -93,13 +115,6 @@ class Config {
} }
} }
if (config.vaultclient) {
// Instance passed from outside
this.vaultclient = config.vaultclient;
this.vaultd = null;
} else {
// Connection data
this.vaultclient = null;
this.vaultd = {}; this.vaultd = {};
if (config.vaultd) { if (config.vaultd) {
if (config.vaultd.port !== undefined) { if (config.vaultd.port !== undefined) {
@ -114,7 +129,6 @@ class Config {
this.vaultd.host = config.vaultd.host; this.vaultd.host = config.vaultd.host;
} }
} }
}
if (config.certFilePaths) { if (config.certFilePaths) {
assert(typeof config.certFilePaths === 'object' assert(typeof config.certFilePaths === 'object'
@ -127,11 +141,12 @@ class Config {
const { key, cert, ca } = config.certFilePaths const { key, cert, ca } = config.certFilePaths
? config.certFilePaths : {}; ? config.certFilePaths : {};
if (key && cert) { if (key && cert) {
const keypath = key; const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
const certpath = cert; const certpath = (cert[0] === '/')
? cert : `${this._basePath}/${cert}`;
let capath; let capath;
if (ca) { if (ca) {
capath = ca; capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK), assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`); `File not found or unreachable: ${capath}`);
} }
@ -157,13 +172,8 @@ class Config {
+ 'expireMetrics must be a boolean'); + 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics; this.expireMetrics = config.expireMetrics;
} }
return config;
if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}
} }
} }
module.exports = Config; module.exports = new Config();

View File

@ -81,17 +81,6 @@ class Datastore {
return this._client.call((backend, done) => backend.incr(key, done), cb); return this._client.call((backend, done) => backend.incr(key, done), cb);
} }
/**
* increment value of a key by the provided value
* @param {string} key - key holding the value
* @param {string} value - value containing the data
* @param {callback} cb - callback
* @return {undefined}
*/
incrby(key, value, cb) {
return this._client.call((backend, done) => backend.incrby(key, value, done), cb);
}
/** /**
* decrement value of a key by 1 * decrement value of a key by 1
* @param {string} key - key holding the value * @param {string} key - key holding the value

View File

@ -6,6 +6,8 @@ const async = require('async');
const { errors } = require('arsenal'); const { errors } = require('arsenal');
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema'); const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
const s3metricResponseJSON = require('../models/s3metricResponse'); const s3metricResponseJSON = require('../models/s3metricResponse');
const config = require('./Config');
const Vault = require('./Vault');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month. const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
@ -21,6 +23,7 @@ class ListMetrics {
constructor(metric, component) { constructor(metric, component) {
this.metric = metric; this.metric = metric;
this.service = component; this.service = component;
this.vault = new Vault(config);
} }
/** /**
@ -80,10 +83,9 @@ class ListMetrics {
const resources = validator.get(this.metric); const resources = validator.get(this.metric);
const timeRange = validator.get('timeRange'); const timeRange = validator.get('timeRange');
const datastore = utapiRequest.getDatastore(); const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids // map account ids to canonical ids
if (this.metric === 'accounts') { if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => { return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) { if (err) {
return cb(err); return cb(err);
} }
@ -122,28 +124,7 @@ class ListMetrics {
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
const timeRange = [start - fifteenMinutes, end]; const timeRange = [start - fifteenMinutes, end];
const datastore = utapiRequest.getDatastore(); const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault(); async.mapLimit(resources, 5, (resource, next) => this.getMetrics(resource, timeRange, datastore, log,
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
return async.mapLimit(list.message.body, 5,
(item, next) => this.getMetrics(item.canonicalId, timeRange,
datastore, log, (err, res) => {
if (err) {
return next(err);
}
return next(null, Object.assign({}, res,
{ accountId: item.accountId }));
}),
cb);
});
}
return async.mapLimit(resources, 5, (resource, next) => this.getMetrics(resource, timeRange, datastore, log,
next), cb); next), cb);
} }
@ -312,10 +293,11 @@ class ListMetrics {
}); });
if (!areMetricsPositive) { if (!areMetricsPositive) {
log.info('negative metric value found', { return cb(errors.InternalError.customizeDescription(
error: resource, 'Utapi is in a transient state for this time period as '
method: 'ListMetrics.getMetrics', + 'metrics are being collected. Please try again in a few '
}); + 'minutes.',
));
} }
/** /**
* Batch result is of the format * Batch result is of the format

View File

@ -63,6 +63,7 @@ const methods = {
getObjectTagging: { method: '_genericPushMetric', changesData: false }, getObjectTagging: { method: '_genericPushMetric', changesData: false },
putObject: { method: '_genericPushMetricPutObject', changesData: true }, putObject: { method: '_genericPushMetricPutObject', changesData: true },
copyObject: { method: '_genericPushMetricPutObject', changesData: true }, copyObject: { method: '_genericPushMetricPutObject', changesData: true },
putData: { method: '_genericPushMetricPutObject', changesData: true },
putObjectAcl: { method: '_genericPushMetric', changesData: true }, putObjectAcl: { method: '_genericPushMetric', changesData: true },
putObjectLegalHold: { method: '_genericPushMetric', changesData: true }, putObjectLegalHold: { method: '_genericPushMetric', changesData: true },
putObjectRetention: { method: '_genericPushMetric', changesData: true }, putObjectRetention: { method: '_genericPushMetric', changesData: true },
@ -90,16 +91,12 @@ const methods = {
}, },
putBucketObjectLock: { method: '_genericPushMetric', changesData: true }, putBucketObjectLock: { method: '_genericPushMetric', changesData: true },
getBucketObjectLock: { method: '_genericPushMetric', changesData: true }, getBucketObjectLock: { method: '_genericPushMetric', changesData: true },
replicateObject: { method: '_genericPushMetricPutObject', changesData: true },
replicateTags: { method: '_genericPushMetric', changesData: true },
replicateDelete: { method: '_pushMetricDeleteMarkerObject', changesData: true },
}; };
const metricObj = { const metricObj = {
buckets: 'bucket', buckets: 'bucket',
accounts: 'accountId', accounts: 'accountId',
users: 'userId', users: 'userId',
location: 'location',
}; };
class UtapiClient { class UtapiClient {
@ -123,17 +120,13 @@ class UtapiClient {
const api = (config || {}).logApi || werelogs; const api = (config || {}).logApi || werelogs;
this.log = new api.Logger('UtapiClient'); this.log = new api.Logger('UtapiClient');
// By default, we push all resource types // By default, we push all resource types
this.metrics = ['buckets', 'accounts', 'users', 'service', 'location']; this.metrics = ['buckets', 'accounts', 'users', 'service'];
this.service = 's3'; this.service = 's3';
this.disableOperationCounters = false; this.disableOperationCounters = false;
this.enabledOperationCounters = []; this.enabledOperationCounters = [];
this.disableClient = true; this.disableClient = true;
if (config && !config.disableClient) { if (config && !config.disableClient) {
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
if (config.metrics) { if (config.metrics) {
const message = 'invalid property in UtapiClient configuration'; const message = 'invalid property in UtapiClient configuration';
assert(Array.isArray(config.metrics), `${message}: metrics ` assert(Array.isArray(config.metrics), `${message}: metrics `
@ -161,6 +154,9 @@ class UtapiClient {
if (config.enabledOperationCounters) { if (config.enabledOperationCounters) {
this.enabledOperationCounters = config.enabledOperationCounters; this.enabledOperationCounters = config.enabledOperationCounters;
} }
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
} }
} }
@ -544,13 +540,10 @@ class UtapiClient {
const paramsArr = this._getParamsArr(params); const paramsArr = this._getParamsArr(params);
paramsArr.forEach(p => { paramsArr.forEach(p => {
cmds.push(['incr', generateCounter(p, 'numberOfObjectsCounter')]); cmds.push(['incr', generateCounter(p, 'numberOfObjectsCounter')]);
const counterAction = action === 'putDeleteMarkerObject' ? 'deleteObject' : action; if (this._isCounterEnabled('deleteObject')) {
if (this._isCounterEnabled(counterAction)) { cmds.push(['incr', generateKey(p, 'deleteObject', timestamp)]);
cmds.push(['incr', generateKey(p, counterAction, timestamp)]);
} }
cmds.push(['zrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp]);
}); });
return this.ds.batch(cmds, (err, results) => { return this.ds.batch(cmds, (err, results) => {
if (err) { if (err) {
log.error('error pushing metric', { log.error('error pushing metric', {
@ -584,48 +577,13 @@ class UtapiClient {
// empty. // empty.
actionCounter = Number.isNaN(actionCounter) actionCounter = Number.isNaN(actionCounter)
|| actionCounter < 0 ? 1 : actionCounter; || actionCounter < 0 ? 1 : actionCounter;
if (Number.isInteger(params.byteLength)) {
/* byteLength is passed in from cloudserver under the follow conditions:
* - bucket versioning is suspended
* - object version id is null
* - the content length of the object exists
* In this case, the master key is deleted and replaced with a delete marker.
* The decrement accounts for the deletion of the master key when utapi reports
* on the number of objects.
*/
actionCounter -= 1;
}
const key = generateStateKey(p, 'numberOfObjects'); const key = generateStateKey(p, 'numberOfObjects');
const byteArr = results[index + commandsGroupSize - 1][1];
const oldByteLength = byteArr ? parseInt(byteArr[0], 10) : 0;
const newByteLength = member.serialize(Math.max(0, oldByteLength - params.byteLength));
cmds2.push( cmds2.push(
['zremrangebyscore', key, timestamp, timestamp], ['zremrangebyscore', key, timestamp, timestamp],
['zadd', key, timestamp, member.serialize(actionCounter)], ['zadd', key, timestamp, member.serialize(actionCounter)],
); );
if (Number.isInteger(params.byteLength)) {
cmds2.push(
['decr', generateCounter(p, 'numberOfObjectsCounter')],
['decrby', generateCounter(p, 'storageUtilizedCounter'), params.byteLength],
);
}
if (byteArr) {
cmds2.push(
['zremrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp],
['zadd', generateStateKey(p, 'storageUtilized'), timestamp, newByteLength],
);
}
return true; return true;
}); });
if (noErr) { if (noErr) {
return this.ds.batch(cmds2, cb); return this.ds.batch(cmds2, cb);
} }
@ -1067,10 +1025,10 @@ class UtapiClient {
storageUtilizedDelta], storageUtilizedDelta],
[redisCmd, generateCounter(p, 'numberOfObjectsCounter')], [redisCmd, generateCounter(p, 'numberOfObjectsCounter')],
); );
if (this._isCounterEnabled(action)) { if (action !== 'putData' && this._isCounterEnabled(action)) {
cmds.push(['incr', generateKey(p, action, timestamp)]); cmds.push(['incr', generateKey(p, action, timestamp)]);
} }
if (action === 'putObject' || action === 'replicateObject') { if (action === 'putObject' || action === 'putData') {
cmds.push( cmds.push(
['incrby', generateKey(p, 'incomingBytes', timestamp), ['incrby', generateKey(p, 'incomingBytes', timestamp),
newByteLength], newByteLength],
@ -1156,69 +1114,6 @@ class UtapiClient {
}); });
} }
/**
*
* @param {string} location - name of data location
* @param {number} updateSize - size in bytes to update location metric by,
* could be negative, indicating deleted object
* @param {string} reqUid - Request Unique Identifier
* @param {function} callback - callback to call
* @return {undefined}
*/
pushLocationMetric(location, updateSize, reqUid, callback) {
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
const params = {
level: 'location',
service: 's3',
location,
};
this._checkMetricTypes(params);
const action = (updateSize < 0) ? 'decrby' : 'incrby';
const size = (updateSize < 0) ? -updateSize : updateSize;
return this.ds[action](generateKey(params, 'locationStorage'), size,
err => {
if (err) {
log.error('error pushing metric', {
method: 'UtapiClient.pushLocationMetric',
error: err,
});
return callback(errors.InternalError);
}
return callback();
});
}
/**
*
* @param {string} location - name of data backend to get metric for
* @param {string} reqUid - Request Unique Identifier
* @param {function} callback - callback to call
* @return {undefined}
*/
getLocationMetric(location, reqUid, callback) {
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
const params = {
level: 'location',
service: 's3',
location,
};
const redisKey = generateKey(params, 'locationStorage');
return this.ds.get(redisKey, (err, bytesStored) => {
if (err) {
log.error('error getting metric', {
method: 'UtapiClient: getLocationMetric',
error: err,
});
return callback(errors.InternalError);
}
// if err and bytesStored are null, key does not exist yet
if (bytesStored === null) {
return callback(null, 0);
}
return callback(null, bytesStored);
});
}
/** /**
* Get storage used by bucket/account/user/service * Get storage used by bucket/account/user/service
* @param {object} params - params for the metrics * @param {object} params - params for the metrics

View File

@ -12,23 +12,16 @@ const RedisClient = require('../libV2/redis');
const REINDEX_SCHEDULE = '0 0 * * Sun'; const REINDEX_SCHEDULE = '0 0 * * Sun';
const REINDEX_LOCK_KEY = 's3:utapireindex:lock'; const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
const REINDEX_LOCK_TTL = (60 * 60) * 24; const REINDEX_LOCK_TTL = (60 * 60) * 24;
const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== undefined
? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex { class UtapiReindex {
constructor(config) { constructor(config) {
this._enabled = false; this._enabled = false;
this._schedule = REINDEX_SCHEDULE; this._schedule = REINDEX_SCHEDULE;
this._redis = { this._sentinel = {
name: 'scality-s3',
sentinelPassword: '',
sentinels: [{
host: '127.0.0.1', host: '127.0.0.1',
port: 16379, port: 16379,
}], name: 'scality-s3',
sentinelPassword: '',
}; };
this._bucketd = { this._bucketd = {
host: '127.0.0.1', host: '127.0.0.1',
@ -46,13 +39,14 @@ class UtapiReindex {
if (config && config.password) { if (config && config.password) {
this._password = config.password; this._password = config.password;
} }
if (config && config.redis) { if (config && config.sentinel) {
const { const {
name, sentinelPassword, sentinels, host, port, name, sentinelPassword,
} = config.redis; } = config.sentinel;
this._redis.name = name || this._redis.name; this._sentinel.host = host || this._sentinel.host;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword; this._sentinel.port = port || this._sentinel.port;
this._redis.sentinels = sentinels || this._redis.sentinels; this._sentinel.name = name || this._sentinel.name;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
} }
if (config && config.bucketd) { if (config && config.bucketd) {
const { host, port } = config.bucketd; const { host, port } = config.bucketd;
@ -64,16 +58,17 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump }); this._log = new werelogs.Logger('UtapiReindex', { level, dump });
} }
this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);
this._requestLogger = this._log.newRequestLogger(); this._requestLogger = this._log.newRequestLogger();
} }
_getRedisClient() { _getRedisClient() {
const client = new RedisClient({ const client = new RedisClient({
sentinels: this._redis.sentinels, sentinels: [{
name: this._redis.name, host: this._sentinel.host,
sentinelPassword: this._redis.sentinelPassword, port: this._sentinel.port,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password, password: this._password,
}); });
client.connect(); client.connect();
@ -88,18 +83,17 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY); return this.ds.del(REINDEX_LOCK_KEY);
} }
_buildFlags(sentinel) { _buildFlags() {
const flags = { const flags = {
/* eslint-disable camelcase */ /* eslint-disable camelcase */
sentinel_ip: sentinel.host, sentinel_ip: this._sentinel.host,
sentinel_port: sentinel.port, sentinel_port: this._sentinel.port,
sentinel_cluster_name: this._redis.name, sentinel_cluster_name: this._sentinel.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`, bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
}; };
if (this._redis.sentinelPassword) { if (this._sentinel.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword; flags.redis_password = this._sentinel.sentinelPassword;
} }
/* eslint-enable camelcase */ /* eslint-enable camelcase */
const opts = []; const opts = [];
Object.keys(flags) Object.keys(flags)
@ -108,17 +102,17 @@ class UtapiReindex {
opts.push(name); opts.push(name);
opts.push(flags[flag]); opts.push(flags[flag]);
}); });
if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts; return opts;
} }
_runScriptWithSentinels(path, remainingSentinels, done) { _runScript(path, done) {
const flags = this._buildFlags(remainingSentinels.shift()); const flags = this._buildFlags();
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`); this._requestLogger.debug(`launching subprocess ${path} `
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]); + `with flags: ${flags}`);
const process = childProcess.spawn('python3.4', [
path,
...flags,
]);
process.stdout.on('data', data => { process.stdout.on('data', data => {
this._requestLogger.info('received output from script', { this._requestLogger.info('received output from script', {
output: Buffer.from(data).toString(), output: Buffer.from(data).toString(),
@ -143,17 +137,6 @@ class UtapiReindex {
statusCode: code, statusCode: code,
script: path, script: path,
}); });
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else { } else {
this._requestLogger.info('script exited successfully', { this._requestLogger.info('script exited successfully', {
statusCode: code, statusCode: code,
@ -164,11 +147,6 @@ class UtapiReindex {
}); });
} }
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) { _attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job'); this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock() this._lock()

View File

@ -14,15 +14,6 @@ class UtapiRequest {
this._datastore = null; this._datastore = null;
this._requestQuery = null; this._requestQuery = null;
this._requestPath = null; this._requestPath = null;
this._vault = null;
}
getVault() {
return this._vault;
}
setVault() {
return this._vault;
} }
/** /**

View File

@ -1,21 +1,16 @@
import argparse
import ast
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests import requests
import redis
import json
import ast
import sys import sys
from threading import Thread
import time import time
import urllib import urllib
import re
import sys
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO) import argparse
_log = logging.getLogger('utapi-reindex:reporting')
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options(): def get_options():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -34,19 +29,8 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None): def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password self._password = password
r = redis.Redis( r = redis.Redis(host=ip, port=port, db=0, password=password)
host=ip,
port=port,
db=0,
password=password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name) self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
def read(self, resource, name): def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password) r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)

View File

@ -1,6 +1,5 @@
import argparse import argparse
import concurrent.futures as futures import concurrent.futures as futures
import functools
import itertools import itertools
import json import json
import logging import logging
@ -9,9 +8,9 @@ import re
import sys import sys
import time import time
import urllib import urllib
from pathlib import Path
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from pprint import pprint
import redis import redis
import requests import requests
@ -25,9 +24,6 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100 ACCOUNT_UPDATE_CHUNKSIZE = 100
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options(): def get_options():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP") parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -35,39 +31,9 @@ def get_options():
parser.add_argument("-v", "--redis-password", default=None, help="Redis AUTH Password") parser.add_argument("-v", "--redis-password", default=None, help="Redis AUTH Password")
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name") parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server") parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers") parser.add_argument("-w", "--worker", default=10, help="Number of workers")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request") parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy") return parser.parse_args()
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]
return options
def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path
def chunks(iterable, size): def chunks(iterable, size):
it = iter(iterable) it = iter(iterable)
@ -82,38 +48,22 @@ def _encoded(func):
return urllib.parse.quote(val.encode('utf-8')) return urllib.parse.quote(val.encode('utf-8'))
return inner return inner
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled']) Bucket = namedtuple('Bucket', ['userid', 'name'])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id']) MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size']) BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])
class MaxRetriesReached(Exception):
def __init__(self, url):
super().__init__('Max retries reached for request to %s'%url)
class InvalidListing(Exception):
def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket)
class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)
class BucketDClient: class BucketDClient:
'''Performs Listing calls against bucketd''' '''Performs Listing calls against bucketd'''
__url_attribute_format = '{addr}/default/attributes/{bucket}' __url_format = '{addr}/default/bucket/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"} __headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False): def __init__(self, bucketd_addr=None):
self._bucketd_addr = bucketd_addr self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session() self._session = requests.Session()
def _do_req(self, url, check_500=True, **kwargs): def _do_req(self, url, check_500=True, **kwargs):
# Add 1 for the initial request while True:
for x in range(self._max_retries + 1):
try: try:
resp = self._session.get(url, timeout=30, verify=False, headers=self.__headers, **kwargs) resp = self._session.get(url, timeout=30, verify=False, headers=self.__headers, **kwargs)
if check_500 and resp.status_code == 500: if check_500 and resp.status_code == 500:
@ -126,8 +76,6 @@ class BucketDClient:
_log.error('Error during listing, sleeping 5 secs %s'%url) _log.error('Error during listing, sleeping 5 secs %s'%url)
time.sleep(5) time.sleep(5)
raise MaxRetriesReached(url)
def _list_bucket(self, bucket, **kwargs): def _list_bucket(self, bucket, **kwargs):
''' '''
Lists a bucket lazily until "empty" Lists a bucket lazily until "empty"
@ -140,7 +88,7 @@ class BucketDClient:
parameters value. On the first request the function will be called with parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded. `None` and should return its initial value. Return `None` for the param to be excluded.
''' '''
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket) url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)} static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)} dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop is_truncated = True # Set to True for first loop
@ -153,9 +101,6 @@ class BucketDClient:
_log.debug('listing bucket bucket: %s params: %s'%( _log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items()))) bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params) resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200: if resp.status_code == 200:
payload = resp.json() payload = resp.json()
except ValueError as e: except ValueError as e:
@ -163,9 +108,6 @@ class BucketDClient:
_log.error('Invalid listing response body! bucket:%s params:%s'%( _log.error('Invalid listing response body! bucket:%s params:%s'%(
bucket, ', '.join('%s=%s'%p for p in params.items()))) bucket, ', '.join('%s=%s'%p for p in params.items())))
continue continue
except MaxRetriesReached:
_log.error('Max retries reached listing bucket:%s'%bucket)
raise
except Exception as e: except Exception as e:
_log.exception(e) _log.exception(e)
_log.error('Unhandled exception during listing! bucket:%s params:%s'%( _log.error('Unhandled exception during listing! bucket:%s params:%s'%(
@ -177,37 +119,7 @@ class BucketDClient:
else: else:
is_truncated = len(payload) > 0 is_truncated = len(payload) > 0
@functools.lru_cache(maxsize=16) def list_buckets(self, name = None):
def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise BucketNotFound(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
raise
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise
def get_bucket_md(self, name):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))
def list_buckets(self, account=None):
def get_next_marker(p): def get_next_marker(p):
if p is None: if p is None:
@ -219,24 +131,19 @@ class BucketDClient:
'maxKeys': 1000, 'maxKeys': 1000,
'marker': get_next_marker 'marker': get_next_marker
} }
if account is not None:
params['prefix'] = '%s..|..' % account
for _, payload in self._list_bucket(USERS_BUCKET, **params): for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = [] buckets = []
for result in payload.get('Contents', []): for result in payload['Contents']:
match = re.match("(\w+)..\|..(\w+.*)", result['key']) match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False) bucket = Bucket(*match.groups())
# We need to get the attributes for each bucket to determine if it is locked if name is None or bucket.name == name:
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket) buckets.append(bucket)
if buckets: if buckets:
yield buckets yield buckets
if name is not None:
# Break on the first matching bucket if a name is given
break
def list_mpus(self, bucket): def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name _bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
@ -273,12 +180,15 @@ class BucketDClient:
upload_id=key['value']['UploadId'])) upload_id=key['value']['UploadId']))
return keys return keys
def _sum_objects(self, bucket, listing, only_latest_when_locked = False): def _sum_objects(self, listing):
count = 0 count = 0
total_size = 0 total_size = 0
last_key = None last_master = None
try: last_size = None
for obj in listing: for _, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
for obj in contents:
count += 1
if isinstance(obj['value'], dict): if isinstance(obj['value'], dict):
# bucketd v6 returns a dict: # bucketd v6 returns a dict:
data = obj.get('value', {}) data = obj.get('value', {})
@ -287,51 +197,39 @@ class BucketDClient:
# bucketd v7 returns an encoded string # bucketd v7 returns an encoded string
data = json.loads(obj['value']) data = json.loads(obj['value'])
size = data.get('content-length', 0) size = data.get('content-length', 0)
is_latest = obj['key'] != last_key
last_key = obj['key']
if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue
count += 1
total_size += size total_size += size
except InvalidListing: # If versioned, subtract the size of the master to avoid double counting
_log.error('Invalid contents in listing. bucket:%s'%bucket.name) if last_master is not None and obj['key'].startswith(last_master + '\x00'):
raise InvalidListing(bucket.name) _log.info('Detected versioned key: %s - subtracting master size: %i'% (
return count, total_size obj['key'],
last_size,
))
total_size -= last_size
count -= 1
last_master = None
def _extract_listing(self, key, listing): # Only save master versions
for status_code, payload in listing: elif '\x00' not in obj['key']:
contents = payload[key] if isinstance(payload, dict) else payload last_master = obj['key']
if contents is None: last_size = size
raise InvalidListing('')
for obj in contents: return count, total_size
yield obj
def count_bucket_contents(self, bucket): def count_bucket_contents(self, bucket):
def get_key_marker(p): def get_next_marker(p):
if p is None: if p is None or len(p) == 0:
return '' return ''
return p.get('NextKeyMarker', '') return p[-1].get('key', '')
def get_vid_marker(p):
if p is None:
return ''
return p.get('NextVersionIdMarker', '')
params = { params = {
'listingType': 'DelimiterVersions', 'listingType': 'Basic',
'maxKeys': 1000, 'maxKeys': 1000,
'keyMarker': get_key_marker, 'gt': get_next_marker,
'versionIdMarker': get_vid_marker,
} }
listing = self._list_bucket(bucket.name, **params) count, total_size = self._sum_objects(self._list_bucket(bucket.name, **params))
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
return BucketContents( return BucketContents(
bucket=bucket, bucket=bucket,
obj_count=count, obj_count=count,
@ -339,8 +237,7 @@ class BucketDClient:
) )
def count_mpu_parts(self, mpu): def count_mpu_parts(self, mpu):
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name _bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)
def get_prefix(p): def get_prefix(p):
if p is None: if p is None:
@ -360,38 +257,19 @@ class BucketDClient:
'listingType': 'Delimiter', 'listingType': 'Delimiter',
} }
listing = self._list_bucket(shadow_bucket_name, **params) count, total_size = self._sum_objects(self._list_bucket(_bucket, **params))
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
return BucketContents( return BucketContents(
bucket=shadow_bucket, bucket=mpu.bucket._replace(name=_bucket),
obj_count=0, # MPU parts are not counted towards numberOfObjects obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size total_size=total_size
) )
def list_all_buckets(bucket_client):
return bucket_client.list_buckets()
def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)
def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue
yield batch
def index_bucket(client, bucket): def index_bucket(client, bucket):
''' '''
Takes an instance of BucketDClient and a bucket name, and returns a Takes an instance of BucketDClient and a bucket name, and returns a
tuple of BucketContents for the passed bucket and its mpu shadow bucket. tuple of BucketContents for the passed bucket and its mpu shadow bucket.
''' '''
try:
bucket_total = client.count_bucket_contents(bucket) bucket_total = client.count_bucket_contents(bucket)
mpus = client.list_mpus(bucket) mpus = client.list_mpus(bucket)
if not mpus: if not mpus:
@ -403,10 +281,6 @@ def index_bucket(client, bucket):
total_size += mpu.total_size total_size += mpu.total_size
return bucket_total._replace(total_size=total_size) return bucket_total._replace(total_size=total_size)
except Exception as e:
_log.exception(e)
_log.error('Error during listing. Removing from results bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
def update_report(report, key, obj_count, total_size): def update_report(report, key, obj_count, total_size):
'''Convenience function to update the report dicts''' '''Convenience function to update the report dicts'''
@ -424,16 +298,9 @@ def get_redis_client(options):
host=options.sentinel_ip, host=options.sentinel_ip,
port=options.sentinel_port, port=options.sentinel_port,
db=0, db=0,
password=options.redis_password, password=options.redis_password
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
) )
try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name) ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
return redis.Redis( return redis.Redis(
host=ip, host=ip,
port=port, port=port,
@ -467,69 +334,40 @@ def log_report(resource, name, obj_count, total_size):
if __name__ == '__main__': if __name__ == '__main__':
options = get_options() options = get_options()
if options.debug: if options.bucket is not None and not options.bucket.strip():
_log.setLevel(logging.DEBUG) print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked) bucket_client = BucketDClient(options.bucketd_addr)
redis_client = get_redis_client(options) redis_client = get_redis_client(options)
account_reports = {} account_reports = {}
observed_buckets = set() observed_buckets = set()
failed_accounts = set()
if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)
with ThreadPoolExecutor(max_workers=options.worker) as executor: with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in batch_generator: for batch in bucket_client.list_buckets(options.bucket):
bucket_reports = {} bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch } jobs = [executor.submit(index_bucket, bucket_client, b) for b in batch]
for job in futures.as_completed(jobs.keys()): for job in futures.as_completed(jobs):
try:
total = job.result() # Summed bucket and shadowbucket totals total = job.result() # Summed bucket and shadowbucket totals
except InvalidListing:
_bucket = jobs[job]
_log.error('Failed to list bucket %s. Removing from results.'%_bucket.name)
# Add the bucket to observed_buckets anyway to avoid clearing existing metrics
observed_buckets.add(_bucket.name)
# If we can not list one of an account's buckets we can not update its total
failed_accounts.add(_bucket.userid)
continue
observed_buckets.add(total.bucket.name) observed_buckets.add(total.bucket.name)
update_report(bucket_reports, total.bucket.name, total.obj_count, total.total_size) update_report(bucket_reports, total.bucket.name, total.obj_count, total.total_size)
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size) update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
# Bucket reports can be updated as we get them # Bucket reports can be updated as we get them
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items(): for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size']) update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size']) log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute() pipeline.execute()
stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets')) recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket: if options.bucket is None:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
stale_buckets = recorded_buckets.difference(observed_buckets) stale_buckets = recorded_buckets.difference(observed_buckets)
elif observed_buckets and options.bucket not in recorded_buckets:
# The provided bucket does not exist, so clean up any metrics
stale_buckets = { options.bucket }
else:
stale_buckets = set()
_log.info('Found %s stale buckets' % len(stale_buckets)) _log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE): for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk: for bucket in chunk:
@ -538,46 +376,21 @@ if __name__ == '__main__':
pipeline.execute() pipeline.execute()
# Account metrics are not updated if a bucket is specified # Account metrics are not updated if a bucket is specified
if options.bucket: if options.bucket is None:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
else:
# Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run:
for userid, report in account_reports.items():
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
else:
# Update total account reports in chunks # Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE): for chunk in chunks(account_reports.items(), ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk: for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size']) update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size']) log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute() pipeline.execute()
if options.account: observed_accounts = set(account_reports.keys())
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts')) recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
if options.account:
stale_accounts = { a for a in options.account if a not in observed_accounts }
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values # Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts) stale_accounts = recorded_accounts.difference(observed_accounts)
_log.info('Found %s stale accounts' % len(stale_accounts)) _log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run:
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE): for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk: for account in chunk:

View File

@ -52,9 +52,6 @@ const keys = {
getObjectRetention: prefix => `${prefix}GetObjectRetention`, getObjectRetention: prefix => `${prefix}GetObjectRetention`,
putObjectLegalHold: prefix => `${prefix}PutObjectLegalHold`, putObjectLegalHold: prefix => `${prefix}PutObjectLegalHold`,
getObjectLegalHold: prefix => `${prefix}GetObjectLegalHold`, getObjectLegalHold: prefix => `${prefix}GetObjectLegalHold`,
replicateObject: prefix => `${prefix}ReplicateObject`,
replicateTags: prefix => `${prefix}ReplicateTags`,
replicateDelete: prefix => `${prefix}ReplicateDelete`,
incomingBytes: prefix => `${prefix}incomingBytes`, incomingBytes: prefix => `${prefix}incomingBytes`,
outgoingBytes: prefix => `${prefix}outgoingBytes`, outgoingBytes: prefix => `${prefix}outgoingBytes`,
}; };
@ -68,10 +65,10 @@ const keys = {
*/ */
function getSchemaPrefix(params, timestamp) { function getSchemaPrefix(params, timestamp) {
const { const {
bucket, accountId, userId, level, service, location, bucket, accountId, userId, level, service,
} = params; } = params;
// `service` property must remain last because other objects also include it // `service` property must remain last because other objects also include it
const id = bucket || accountId || userId || location || service; const id = bucket || accountId || userId || service;
const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:` const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:`
: `${service}:${level}:${id}:`; : `${service}:${level}:${id}:`;
return prefix; return prefix;
@ -86,13 +83,9 @@ function getSchemaPrefix(params, timestamp) {
*/ */
function generateKey(params, metric, timestamp) { function generateKey(params, metric, timestamp) {
const prefix = getSchemaPrefix(params, timestamp); const prefix = getSchemaPrefix(params, timestamp);
if (params.location) {
return `${prefix}locationStorage`;
}
return keys[metric](prefix); return keys[metric](prefix);
} }
/** /**
* Returns a list of the counters for a metric type * Returns a list of the counters for a metric type
* @param {object} params - object with metric type and id as a property * @param {object} params - object with metric type and id as a property

View File

@ -7,6 +7,7 @@ const { Clustering, errors, ipCheck } = require('arsenal');
const arsenalHttps = require('arsenal').https; const arsenalHttps = require('arsenal').https;
const { Logger } = require('werelogs'); const { Logger } = require('werelogs');
const config = require('./Config');
const routes = require('../router/routes'); const routes = require('../router/routes');
const Route = require('../router/Route'); const Route = require('../router/Route');
const Router = require('../router/Router'); const Router = require('../router/Router');
@ -27,12 +28,7 @@ class UtapiServer {
constructor(worker, port, datastore, logger, config) { constructor(worker, port, datastore, logger, config) {
this.worker = worker; this.worker = worker;
this.port = port; this.port = port;
this.vault = config.vaultclient; this.router = new Router(config);
if (!this.vault) {
const Vault = require('./Vault');
this.vault = new Vault(config);
}
this.router = new Router(config, this.vault);
this.logger = logger; this.logger = logger;
this.datastore = datastore; this.datastore = datastore;
this.server = null; this.server = null;
@ -75,7 +71,6 @@ class UtapiServer {
req.socket.setNoDelay(); req.socket.setNoDelay();
const { query, path, pathname } = url.parse(req.url, true); const { query, path, pathname } = url.parse(req.url, true);
const utapiRequest = new UtapiRequest() const utapiRequest = new UtapiRequest()
.setVault(this.vault)
.setRequest(req) .setRequest(req)
.setLog(this.logger.newRequestLogger()) .setLog(this.logger.newRequestLogger())
.setResponse(res) .setResponse(res)
@ -219,7 +214,8 @@ class UtapiServer {
* @property {object} params.log - logger configuration * @property {object} params.log - logger configuration
* @return {undefined} * @return {undefined}
*/ */
function spawn(config) { function spawn(params) {
Object.assign(config, params);
const { const {
workers, redis, log, port, workers, redis, log, port,
} = config; } = config;

View File

@ -12,9 +12,12 @@ function orNull(value) {
return value === undefined ? null : value; return value === undefined ? null : value;
} }
const _multiprocessDB = {};
class MemoryCache { class MemoryCache {
constructor() { constructor(config) {
this._data = {}; this._data = config && config.multiprocess ? _multiprocessDB : {};
this._shards = {}; this._shards = {};
this._prefix = 'utapi'; this._prefix = 'utapi';
this._expirations = {}; this._expirations = {};

View File

@ -23,7 +23,7 @@ class CacheClient {
async pushMetric(metric) { async pushMetric(metric) {
const shard = shardFromTimestamp(metric.timestamp); const shard = shardFromTimestamp(metric.timestamp);
if (!(await this._cacheBackend.addToShard(shard, metric))) { if (!this._cacheBackend.addToShard(shard, metric)) {
return false; return false;
} }
await this._counterBackend.updateCounters(metric); await this._counterBackend.updateCounters(metric);

11
libV2/cache/index.js vendored
View File

@ -8,6 +8,15 @@ const cacheTypes = {
memory: () => new MemoryCache(), memory: () => new MemoryCache(),
}; };
function buildCacheClient(cacheConfig) {
const { backend, counter, cache } = cacheConfig;
return new CacheClient({
cacheBackend: cacheTypes[backend](cache),
counterBackend: cacheTypes[backend](counter),
});
}
// TODO remove after all users have been moved to buildCacheClient
const cacheBackend = cacheTypes[config.cache.backend](config.cache); const cacheBackend = cacheTypes[config.cache.backend](config.cache);
const counterBackend = cacheTypes[config.cache.backend](config.redis); const counterBackend = cacheTypes[config.cache.backend](config.redis);
@ -17,5 +26,7 @@ module.exports = {
MemoryCache, MemoryCache,
RedisCache, RedisCache,
}, },
// TODO remove after all users have been moved to buildCacheClient
client: new CacheClient({ cacheBackend, counterBackend }), client: new CacheClient({ cacheBackend, counterBackend }),
buildCacheClient,
}; };

64
libV2/cli.js Normal file
View File

@ -0,0 +1,64 @@
const { Command } = require('commander');
const _config = require('./config');
const { comprehend } = require('./utils');
const { logger } = require('./utils');
const availableSubsystems = [
'server',
'ingest',
'checkpoint',
'snapshot',
'repair',
'reindex',
'limit',
];
const enabledSubsystems = [];
const cli = new Command()
.option('--server', 'Start a Utapi metrics server', () =>
enabledSubsystems.push('server') && true)
.option('--ingest', 'Start the ingest task', () =>
enabledSubsystems.push('ingest') && true)
.option(
'--checkpoint',
'Start the checkpoint task scheduler',
() => enabledSubsystems.push('checkpoint') && true,
)
.option('--snapshot', 'Start the snapshot task scheduler', () =>
enabledSubsystems.push('snapshot') && true)
.option('--repair', 'Start the repair task scheduler', () =>
enabledSubsystems.push('repair') && true)
.option('--reindex', 'Start the reindex task scheduler', () =>
enabledSubsystems.push('reindex') && true)
.option(
'--now',
'Ignore configured schedules and execute specified background tasks immediately, exiting afterward.\n'
+ 'Can not be used with --server.',
);
class UtapiCLI {
static _parseSubsystems(config) {
return availableSubsystems.filter(sys => config[sys]);
// return comprehend(subsystems, (_, key) => ({
// key,
// value: !!config[key],
// }));
}
static parse(argv) {
const parsed = cli.parse(argv);
if (parsed.now && enabledSubsystems.includes('server')) {
throw new Error('--now can not be used with --server');
}
console.log(parsed)
const cliConfig = {
subsystems: UtapiCLI._parseSubsystems(parsed),
};
return _config.merge(cliConfig);
}
}
module.exports = UtapiCLI;

View File

@ -11,12 +11,7 @@ const encode = require('encoding-down');
/* eslint-enable import/no-extraneous-dependencies */ /* eslint-enable import/no-extraneous-dependencies */
const { UtapiMetric } = require('../models'); const { UtapiMetric } = require('../models');
const { const { LoggerContext, asyncOrCallback } = require('../utils');
LoggerContext,
logEventFilter,
asyncOrCallback,
buildFilterChain,
} = require('../utils');
const moduleLogger = new LoggerContext({ const moduleLogger = new LoggerContext({
module: 'client', module: 'client',
@ -89,11 +84,6 @@ class UtapiClient {
this._drainCanSchedule = true; this._drainCanSchedule = true;
this._drainDelay = (config && config.drainDelay) || 30000; this._drainDelay = (config && config.drainDelay) || 30000;
this._suppressedEventFields = (config && config.suppressedEventFields) || null; this._suppressedEventFields = (config && config.suppressedEventFields) || null;
const eventFilters = (config && config.filter) || {};
this._shouldPushMetric = buildFilterChain(eventFilters);
if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => moduleLogger.info(...args), 'utapi event filter enabled', eventFilters);
}
} }
async join() { async join() {
@ -250,11 +240,6 @@ class UtapiClient {
? data ? data
: new UtapiMetric(data); : new UtapiMetric(data);
// If this event has been filtered then exit early
if (!this._shouldPushMetric(metric)) {
return;
}
// Assign a uuid if one isn't passed // Assign a uuid if one isn't passed
if (!metric.uuid) { if (!metric.uuid) {
metric.uuid = uuid.v4(); metric.uuid = uuid.v4();

View File

@ -16,13 +16,15 @@
"warp10": { "warp10": {
"host": "127.0.0.1", "host": "127.0.0.1",
"port": 4802, "port": 4802,
"nodeId": "single_node", "nodeId": "single_node"
"requestTimeout": 60000,
"connectTimeout": 60000
}, },
"healthChecks": { "healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"] "allowFrom": ["127.0.0.1/8", "::1"]
}, },
"vaultd": {
"host": "127.0.0.1",
"port": 8500
},
"cacheBackend": "memory", "cacheBackend": "memory",
"development": false, "development": false,
"nodeId": "single_node", "nodeId": "single_node",
@ -42,23 +44,5 @@
"diskUsage": { "diskUsage": {
"retentionDays": 45, "retentionDays": 45,
"expirationEnabled": false "expirationEnabled": false
},
"serviceUser": {
"arn": "arn:aws:iam::000000000000:user/scality-internal/service-utapi-user",
"enabled": false
},
"filter": {
"allow": {},
"deny": {}
},
"metrics" : {
"enabled": false,
"host": "localhost",
"ingestPort": 10902,
"checkpointPort": 10903,
"snapshotPort": 10904,
"diskUsagePort": 10905,
"reindexPort": 10906,
"repairPort": 10907
} }
} }

View File

@ -2,12 +2,8 @@ const fs = require('fs');
const path = require('path'); const path = require('path');
const Joi = require('@hapi/joi'); const Joi = require('@hapi/joi');
const assert = require('assert'); const assert = require('assert');
const defaults = require('./defaults.json');
const werelogs = require('werelogs');
const { const { truthy, envNamespace } = require('../constants');
truthy, envNamespace, allowedFilterFields, allowedFilterStates,
} = require('../constants');
const configSchema = require('./schema'); const configSchema = require('./schema');
// We need to require the specific file rather than the parent module to avoid a circular require // We need to require the specific file rather than the parent module to avoid a circular require
const { parseDiskSizeSpec } = require('../utils/disk'); const { parseDiskSizeSpec } = require('../utils/disk');
@ -73,6 +69,7 @@ class Config {
constructor(overrides) { constructor(overrides) {
this._basePath = path.join(__dirname, '../../'); this._basePath = path.join(__dirname, '../../');
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath); this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
this._defaultsPath = path.join(__dirname, 'defaults.json');
this.host = undefined; this.host = undefined;
this.port = undefined; this.port = undefined;
@ -90,11 +87,6 @@ class Config {
parsedConfig = this._recursiveUpdate(parsedConfig, overrides); parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
} }
Object.assign(this, parsedConfig); Object.assign(this, parsedConfig);
werelogs.configure({
level: Config.logging.level,
dump: Config.logging.dumpLevel,
});
} }
static _readFile(path, encoding = 'utf-8') { static _readFile(path, encoding = 'utf-8') {
@ -119,7 +111,7 @@ class Config {
} }
_loadDefaults() { _loadDefaults() {
return defaults; return Config._readJSON(this._defaultsPath);
} }
_loadUserConfig() { _loadUserConfig() {
@ -184,9 +176,7 @@ class Config {
} }
static _parseRedisConfig(prefix, config) { static _parseRedisConfig(prefix, config) {
const redisConf = { const redisConf = {};
retry: config.retry,
};
if (config.sentinels || _definedInEnv(`${prefix}_SENTINELS`)) { if (config.sentinels || _definedInEnv(`${prefix}_SENTINELS`)) {
redisConf.name = _loadFromEnv(`${prefix}_NAME`, config.name); redisConf.name = _loadFromEnv(`${prefix}_NAME`, config.name);
redisConf.sentinels = _loadFromEnv( redisConf.sentinels = _loadFromEnv(
@ -198,10 +188,6 @@ class Config {
`${prefix}_SENTINEL_PASSWORD`, `${prefix}_SENTINEL_PASSWORD`,
config.sentinelPassword, config.sentinelPassword,
); );
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
config.password,
);
} else { } else {
redisConf.host = _loadFromEnv( redisConf.host = _loadFromEnv(
`${prefix}_HOST`, `${prefix}_HOST`,
@ -239,28 +225,6 @@ class Config {
return certs; return certs;
} }
static _parseResourceFilters(config) {
const resourceFilters = {};
allowedFilterFields.forEach(
field => allowedFilterStates.forEach(
state => {
const configResources = (config[state] && config[state][field]) || null;
const envVar = `FILTER_${field.toUpperCase()}_${state.toUpperCase()}`;
const resources = _loadFromEnv(envVar, configResources, _typeCasts.list);
if (resources) {
if (resourceFilters[field]) {
throw new Error('You can not define both an allow and a deny list for an event field.');
}
resourceFilters[field] = { [state]: new Set(resources) };
}
},
),
);
return resourceFilters;
}
_parseConfig(config) { _parseConfig(config) {
const parsedConfig = {}; const parsedConfig = {};
@ -299,8 +263,6 @@ class Config {
const warp10Conf = { const warp10Conf = {
readToken: _loadFromEnv('WARP10_READ_TOKEN', config.warp10.readToken), readToken: _loadFromEnv('WARP10_READ_TOKEN', config.warp10.readToken),
writeToken: _loadFromEnv('WARP10_WRITE_TOKEN', config.warp10.writeToken), writeToken: _loadFromEnv('WARP10_WRITE_TOKEN', config.warp10.writeToken),
requestTimeout: _loadFromEnv('WARP10_REQUEST_TIMEOUT', config.warp10.requestTimeout, _typeCasts.int),
connectTimeout: _loadFromEnv('WARP10_CONNECT_TIMEOUT', config.warp10.connectTimeout, _typeCasts.int),
}; };
if (Array.isArray(config.warp10.hosts) || _definedInEnv('WARP10_HOSTS')) { if (Array.isArray(config.warp10.hosts) || _definedInEnv('WARP10_HOSTS')) {
@ -378,24 +340,6 @@ class Config {
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList); parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
parsedConfig.serviceUser = {
arn: _loadFromEnv('SERVICE_USER_ARN', config.serviceUser.arn),
enabled: _loadFromEnv('SERVICE_USER_ENABLED', config.serviceUser.enabled, _typeCasts.bool),
};
parsedConfig.filter = Config._parseResourceFilters(config.filter);
parsedConfig.metrics = {
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
};
return parsedConfig; return parsedConfig;
} }
@ -406,7 +350,7 @@ class Config {
* @param {object} newConfig - an object using the same structure as the config file * @param {object} newConfig - an object using the same structure as the config file
* @returns {Config} - New Config instance * @returns {Config} - New Config instance
*/ */
static merge(newConfig) { merge(newConfig) {
return new Config(newConfig); return new Config(newConfig);
} }
} }

View File

@ -1,23 +1,9 @@
const Joi = require('@hapi/joi'); const Joi = require('@hapi/joi');
const { allowedFilterFields, allowedFilterStates } = require('../constants');
const backoffSchema = Joi.object({
min: Joi.number(),
max: Joi.number(),
deadline: Joi.number(),
jitter: Joi.number(),
factor: Joi.number(),
});
const redisRetrySchema = Joi.object({
connectBackoff: backoffSchema,
});
const redisServerSchema = Joi.object({ const redisServerSchema = Joi.object({
host: Joi.string(), host: Joi.string(),
port: Joi.number(), port: Joi.number(),
password: Joi.string().allow(''), password: Joi.string().allow(''),
retry: redisRetrySchema,
}); });
const redisSentinelSchema = Joi.object({ const redisSentinelSchema = Joi.object({
@ -28,7 +14,6 @@ const redisSentinelSchema = Joi.object({
})), })),
password: Joi.string().default('').allow(''), password: Joi.string().default('').allow(''),
sentinelPassword: Joi.string().default('').allow(''), sentinelPassword: Joi.string().default('').allow(''),
retry: redisRetrySchema,
}); });
const warp10SingleHost = Joi.object({ const warp10SingleHost = Joi.object({
@ -100,31 +85,7 @@ const schema = Joi.object({
expirationEnabled: Joi.boolean(), expirationEnabled: Joi.boolean(),
hardLimit: Joi.string(), hardLimit: Joi.string(),
}), }),
serviceUser: Joi.object({
arn: Joi.string(),
enabled: Joi.boolean(),
}),
filter: Joi.object(allowedFilterStates.reduce(
(filterObj, state) => {
filterObj[state] = allowedFilterFields.reduce(
(stateObj, field) => {
stateObj[field] = Joi.array().items(Joi.string());
return stateObj;
}, {},
);
return filterObj;
}, {},
)),
metrics: {
enabled: Joi.boolean(),
host: Joi.string(),
ingestPort: Joi.number().port(),
checkpointPort: Joi.number().port(),
snapshotPort: Joi.number().port(),
diskUsagePort: Joi.number().port(),
reindexPort: Joi.number().port(),
repairPort: Joi.number().port(),
},
}); });
module.exports = schema; module.exports = schema;

View File

@ -19,23 +19,17 @@ const constants = {
'createBucket', 'createBucket',
'deleteBucket', 'deleteBucket',
'deleteBucketCors', 'deleteBucketCors',
'deleteBucketEncryption',
'deleteBucketLifecycle',
'deleteBucketReplication', 'deleteBucketReplication',
'deleteBucketTagging',
'deleteBucketWebsite', 'deleteBucketWebsite',
'deleteObject', 'deleteObject',
'deleteObjectTagging', 'deleteObjectTagging',
'getBucketAcl', 'getBucketAcl',
'getBucketCors', 'getBucketCors',
'getBucketEncryption',
'getBucketLifecycle',
'getBucketLocation', 'getBucketLocation',
'getBucketNotification', 'getBucketNotification',
'getBucketObjectLock', 'getBucketObjectLock',
'getBucketReplication', 'getBucketReplication',
'getBucketVersioning', 'getBucketVersioning',
'getBucketTagging',
'getBucketWebsite', 'getBucketWebsite',
'getObject', 'getObject',
'getObjectAcl', 'getObjectAcl',
@ -51,23 +45,18 @@ const constants = {
'multiObjectDelete', 'multiObjectDelete',
'putBucketAcl', 'putBucketAcl',
'putBucketCors', 'putBucketCors',
'putBucketEncryption',
'putBucketLifecycle',
'putBucketNotification', 'putBucketNotification',
'putBucketObjectLock', 'putBucketObjectLock',
'putBucketReplication', 'putBucketReplication',
'putBucketVersioning', 'putBucketVersioning',
'putBucketTagging',
'putBucketWebsite', 'putBucketWebsite',
'putData',
'putDeleteMarkerObject', 'putDeleteMarkerObject',
'putObject', 'putObject',
'putObjectAcl', 'putObjectAcl',
'putObjectLegalHold', 'putObjectLegalHold',
'putObjectRetention', 'putObjectRetention',
'putObjectTagging', 'putObjectTagging',
'replicateDelete',
'replicateObject',
'replicateTags',
'uploadPart', 'uploadPart',
'uploadPartCopy', 'uploadPartCopy',
], ],
@ -114,14 +103,6 @@ const constants = {
putDeleteMarkerObject: 'deleteObject', putDeleteMarkerObject: 'deleteObject',
}, },
expirationChunkDuration: 900000000, // 15 minutes in microseconds expirationChunkDuration: 900000000, // 15 minutes in microseconds
allowedFilterFields: [
'operationId',
'location',
'account',
'user',
'bucket',
],
allowedFilterStates: ['allow', 'deny'],
}; };
constants.operationToResponse = constants.operations constants.operationToResponse = constants.operations

View File

@ -1,5 +1,5 @@
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
const bucketclient = require('bucketclient'); const bucketclient = require('bucketclient');
const { BucketClientInterface } = require('arsenal').storage.metadata.bucketclient;
const config = require('../config'); const config = require('../config');
const { LoggerContext } = require('../utils'); const { LoggerContext } = require('../utils');
@ -10,7 +10,7 @@ const moduleLogger = new LoggerContext({
const params = { const params = {
bucketdBootstrap: config.bucketd, bucketdBootstrap: config.bucketd,
https: config.tls, https: config.https,
}; };
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger); module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);

View File

@ -1,25 +1,16 @@
/* eslint-disable no-restricted-syntax */ /* eslint-disable no-restricted-syntax */
const arsenal = require('arsenal'); const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = require('arsenal').constants;
const async = require('async');
const metadata = require('./client'); const metadata = require('./client');
const { LoggerContext, logger } = require('../utils'); const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants'); const { keyVersionSplitter } = require('../constants');
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = arsenal.constants;
const { BucketInfo } = arsenal.models;
const moduleLogger = new LoggerContext({ const moduleLogger = new LoggerContext({
module: 'metadata.client', module: 'metadata.client',
}); });
const ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
const PAGE_SIZE = 1000; const PAGE_SIZE = 1000;
async function _listingWrapper(bucket, params) { function _listingWrapper(bucket, params) {
return new Promise( return new Promise(
(resolve, reject) => metadata.listObject( (resolve, reject) => metadata.listObject(
bucket, bucket,
@ -46,7 +37,7 @@ function _listObject(bucket, prefix, hydrateFunc) {
try { try {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
res = await async.retryable(ebConfig, _listingWrapper)(bucket, { ...listingParams, gt }); res = await _listingWrapper(bucket, { ...listingParams, gt });
} catch (error) { } catch (error) {
moduleLogger.error('Error during listing', { error }); moduleLogger.error('Error during listing', { error });
throw error; throw error;
@ -108,7 +99,7 @@ function bucketExists(bucket) {
bucket, bucket,
logger.newRequestLogger(), logger.newRequestLogger(),
err => { err => {
if (err && (!err.is || !err.is.NoSuchBucket)) { if (err && !err.NoSuchBucket) {
reject(err); reject(err);
return; return;
} }
@ -117,25 +108,9 @@ function bucketExists(bucket) {
)); ));
} }
function getBucket(bucket) {
return new Promise((resolve, reject) => {
metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(), (err, data) => {
if (err) {
reject(err);
return;
}
resolve(BucketInfo.fromObj(data));
},
);
});
}
module.exports = { module.exports = {
listBuckets, listBuckets,
listObjects, listObjects,
listMPUs, listMPUs,
bucketExists, bucketExists,
getBucket,
}; };

View File

@ -3,7 +3,6 @@ const Joi = require('@hapi/joi');
const { buildModel } = require('./Base'); const { buildModel } = require('./Base');
const { apiOperations } = require('../server/spec'); const { apiOperations } = require('../server/spec');
const ResponseContainer = require('./ResponseContainer'); const ResponseContainer = require('./ResponseContainer');
const { httpRequestDurationSeconds } = require('../server/metrics');
const apiTags = Object.keys(apiOperations); const apiTags = Object.keys(apiOperations);
const apiOperationIds = Object.values(apiOperations) const apiOperationIds = Object.values(apiOperations)
@ -22,7 +21,6 @@ const contextSchema = {
logger: Joi.any(), logger: Joi.any(),
request: Joi.any(), request: Joi.any(),
results: Joi.any(), results: Joi.any(),
requestTimer: Joi.any(),
}; };
const RequestContextModel = buildModel('RequestContext', contextSchema); const RequestContextModel = buildModel('RequestContext', contextSchema);
@ -36,10 +34,6 @@ class RequestContext extends RequestContextModel {
const tag = request.swagger.operation['x-router-controller']; const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation; const { operationId } = request.swagger.operation;
const requestTimer = tag !== 'internal'
? httpRequestDurationSeconds.startTimer({ action: operationId })
: null;
request.logger.logger.addDefaultFields({ request.logger.logger.addDefaultFields({
tag, tag,
operationId, operationId,
@ -56,7 +50,6 @@ class RequestContext extends RequestContextModel {
encrypted, encrypted,
results: new ResponseContainer(), results: new ResponseContainer(),
logger: request.logger, logger: request.logger,
requestTimer,
}); });
} }

View File

@ -1,16 +1,39 @@
const { EventEmitter } = require('events'); const { EventEmitter } = require('events');
const os = require('os'); const os = require('os');
const { Command } = require('commander'); const async = require('async');
const { logger } = require('./utils'); const { logger, comprehend } = require('./utils');
const UtapiCLI = require('./cli');
const { UtapiServer } = require('./server');
const {
IngestShard,
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
MonitorDiskUsage,
} = require('./tasks');
const subsystems = {
server: UtapiServer,
ingest: IngestShard,
checkpoint: CreateCheckpoint,
snapshot: CreateSnapshot,
repair: RepairTask,
reindex: ReindexTask,
limit: MonitorDiskUsage,
// TODO split expiration into separate task
// expiration:
};
class Process extends EventEmitter { class Process extends EventEmitter {
constructor(...options) { constructor() {
super(...options); super();
this._program = null; this._config = null;
this._subsystems = null;
} }
async setup() { _registerSignalHandlers() {
const cleanUpFunc = this.join.bind(this); const cleanUpFunc = this.join.bind(this, 1);
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => { ['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
process.on(eventName, cleanUpFunc); process.on(eventName, cleanUpFunc);
}); });
@ -19,27 +42,78 @@ class Process extends EventEmitter {
{ error, stack: error.stack.split(os.EOL) }); { error, stack: error.stack.split(os.EOL) });
cleanUpFunc(); cleanUpFunc();
}); });
this._program = new Command(); }
await this._setup();
async setup() {
this._registerSignalHandlers();
try {
this._config = UtapiCLI.parse(process.argv);
} catch(error) {
console.log(error.message);
return false;
}
// console.log(this._config)
this._subsystems = await Process._setupSubSystems(this._config);
return true;
}
static async _setupSubSystems(config) {
return async.reduce(config.subsystems, {},
// const systems = comprehend(
// config.subsystems,
async (systems, key) => {
const sys = new subsystems[key](config);
await sys.setup();
systems[key] = sys;
return systems;
},
);
} }
async start() { async start() {
this._program.parse(process.argv); if (!this._subsystems) {
await this._start(); throw new Error('The process must be setup before starting!');
}
// console.log(this._subsystems)
await Promise.all(
Object.entries(this._subsystems).map(async ([name, sys]) => {
try {
await sys.start();
} catch (error) {
const msg = `Error starting subsystem ${name}`;
logger.error(msg, { error });
throw new Error(msg);
}
}),
);
} }
async join() { async join(returnCode = 0) {
this.emit('exit'); this.emit('exit');
await this._join(); console.log('-'.repeat(50))
if (this._subsystems) {
const results = await Promise.all(
Object.entries(this._subsystems).map(async ([name, sys]) => {
try {
await sys.join();
} catch (error) {
logger.error(`Error stopping subsystem ${name}`, { error });
return name;
}
return null;
}),
);
const errors = results.filter(e => e !== null);
if (errors.length) {
logger.error(`Error stopping subsystems: ${errors.join(', ')}`, { subsystems: errors });
process.exit(1);
}
} }
/* eslint-disable class-methods-use-this,no-empty-function */ process.exit(returnCode);
async _setup() {}
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
} }
}
module.exports = Process; module.exports = Process;

View File

@ -2,12 +2,9 @@ const EventEmitter = require('events');
const { callbackify, promisify } = require('util'); const { callbackify, promisify } = require('util');
const IORedis = require('ioredis'); const IORedis = require('ioredis');
const { jsutil } = require('arsenal'); const { jsutil } = require('arsenal');
const BackOff = require('backo');
const { whilst } = require('async');
const errors = require('./errors'); const errors = require('./errors');
const { LoggerContext } = require('./utils/log'); const { LoggerContext, asyncOrCallback } = require('./utils');
const { asyncOrCallback } = require('./utils/func');
const moduleLogger = new LoggerContext({ const moduleLogger = new LoggerContext({
module: 'redis', module: 'redis',
@ -71,7 +68,6 @@ class RedisClient extends EventEmitter {
this._redis.off('connect', this._onConnect); this._redis.off('connect', this._onConnect);
this._redis.off('ready', this._onReady); this._redis.off('ready', this._onReady);
this._redis.off('error', this._onError); this._redis.off('error', this._onError);
this._redis.disconnect();
} }
this._isConnected = false; this._isConnected = false;
this._isReady = false; this._isReady = false;
@ -106,7 +102,6 @@ class RedisClient extends EventEmitter {
} }
_onError(error) { _onError(error) {
this._isReady = false;
moduleLogger.error('error connecting to redis', { error }); moduleLogger.error('error connecting to redis', { error });
if (this.listenerCount('error') > 0) { if (this.listenerCount('error') > 0) {
this.emit('error', error); this.emit('error', error);
@ -139,63 +134,20 @@ class RedisClient extends EventEmitter {
} }
async _call(asyncFunc) { async _call(asyncFunc) {
const start = Date.now();
const { connectBackoff } = this._redisOptions.retry || {};
const backoff = new BackOff(connectBackoff);
const timeoutMs = (connectBackoff || {}).deadline || 2000;
let retried = false;
return new Promise((resolve, reject) => {
whilst(
next => { // WARNING: test is asynchronous in `async` v3
if (!connectBackoff && !this.isReady) {
moduleLogger.warn('redis not ready and backoff is not configured');
}
process.nextTick(next, null, !!connectBackoff && !this.isReady);
},
next => {
retried = true;
if ((Date.now() - start) > timeoutMs) {
moduleLogger.error('redis still not ready after max wait, giving up', { timeoutMs });
return next(errors.InternalError.customizeDescription(
'redis client is not ready',
));
}
const backoffDurationMs = backoff.duration();
moduleLogger.error('redis not ready, retrying', { backoffDurationMs });
return setTimeout(next, backoffDurationMs);
},
err => {
if (err) {
return reject(err);
}
if (retried) {
moduleLogger.info('redis connection recovered', {
recoveryOverheadMs: Date.now() - start,
});
}
const funcPromise = asyncFunc(this._redis); const funcPromise = asyncFunc(this._redis);
if (!this._useTimeouts) { if (!this._useTimeouts) {
// If timeouts are disabled simply return the Promise // If timeouts are disabled simply return the Promise
return resolve(funcPromise); return funcPromise;
} }
const { timeout, cancelTimeout } = this._createCommandTimeout(); const { timeout, cancelTimeout } = this._createCommandTimeout();
try { try {
// timeout always rejects so we can just return // timeout always rejects so we can just return
return resolve(Promise.race([funcPromise, timeout])); return await Promise.race([funcPromise, timeout]);
} finally { } finally {
cancelTimeout(); cancelTimeout();
} }
},
);
});
} }
call(func, callback) { call(func, callback) {

View File

@ -1,14 +0,0 @@
const { collectDefaultMetrics, register } = require('prom-client');
collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
async function prometheusMetrics(ctx) {
// eslint-disable-next-line no-param-reassign
ctx.results.statusCode = 200;
ctx.results.body = await register.metrics();
}
module.exports = prometheusMetrics;

View File

@ -40,7 +40,7 @@ async function getStorage(ctx, params) {
macro: 'utapi/getMetricsAt', macro: 'utapi/getMetricsAt',
}; };
return warp10.exec(options); return warp10.exec(options);
}, error => ctx.logger.error('error while fetching metrics', { error })); });
if (res.result.length === 0) { if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { level, resource }); ctx.logger.error('unable to retrieve metrics', { level, resource });

View File

@ -28,11 +28,8 @@ async function listMetric(ctx, params) {
end = Date.now(); end = Date.now();
} }
let results;
try {
// A separate request will be made to warp 10 per requested resource // A separate request will be made to warp 10 per requested resource
results = await Promise.all( const results = await Promise.all(
resources.map(async ({ resource, id }) => { resources.map(async ({ resource, id }) => {
const labels = { [labelName]: id }; const labels = { [labelName]: id };
@ -47,14 +44,7 @@ async function listMetric(ctx, params) {
macro: 'utapi/getMetrics', macro: 'utapi/getMetrics',
}; };
return warp10.exec(options); return warp10.exec(options);
}, error => ctx.logger.error('error during warp 10 request', { });
error,
requestParams: {
start,
end,
labels,
},
}));
if (res.result.length === 0) { if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { resource, type: params.level }); ctx.logger.error('unable to retrieve metrics', { resource, type: params.level });
@ -79,11 +69,6 @@ async function listMetric(ctx, params) {
}; };
}), }),
); );
} catch (error) {
ctx.logger.error('error fetching metrics from warp10', { error });
throw errors.InternalError;
}
// Convert the results from warp10 into the expected response format // Convert the results from warp10 into the expected response format
const resp = results const resp = results

View File

@ -4,8 +4,9 @@ const express = require('express');
const bodyParser = require('body-parser'); const bodyParser = require('body-parser');
const { ciphers, dhparam } = require('arsenal').https; const { ciphers, dhparam } = require('arsenal').https;
const Process = require('../process'); const SubSystem = require('../subsystem');
const config = require('../config');
// const config = require('../config');
const { initializeOasTools, middleware } = require('./middleware'); const { initializeOasTools, middleware } = require('./middleware');
const { spec: apiSpec } = require('./spec'); const { spec: apiSpec } = require('./spec');
const { client: cacheClient } = require('../cache'); const { client: cacheClient } = require('../cache');
@ -15,11 +16,12 @@ const moduleLogger = new LoggerContext({
module: 'server', module: 'server',
}); });
class UtapiServer extends Process { class UtapiServer extends SubSystem {
constructor() { constructor(config) {
super(); super();
this._app = null; this._app = null;
this._server = null; this._server = null;
this._config = config;
} }
static async _createApp(spec) { static async _createApp(spec) {
@ -28,18 +30,17 @@ class UtapiServer extends Process {
app.use(middleware.loggerMiddleware); app.use(middleware.loggerMiddleware);
await initializeOasTools(spec, app); await initializeOasTools(spec, app);
app.use(middleware.errorMiddleware); app.use(middleware.errorMiddleware);
app.use(middleware.httpMetricsMiddleware);
app.use(middleware.responseLoggerMiddleware); app.use(middleware.responseLoggerMiddleware);
return app; return app;
} }
static _createHttpsAgent() { _createHttpsAgent() {
const conf = { const conf = {
ciphers: ciphers.ciphers, ciphers: ciphers.ciphers,
dhparam: dhparam.dhparam, dhparam,
cert: config.tls.cert, cert: this._config.tls.cert,
key: config.tls.key, key: this._config.tls.key,
ca: config.tls.ca ? [config.tls.ca] : null, ca: this._config.tls.ca ? [this._config.tls.ca] : null,
requestCert: false, requestCert: false,
rejectUnauthorized: true, rejectUnauthorized: true,
}; };
@ -48,31 +49,31 @@ class UtapiServer extends Process {
return conf; return conf;
} }
static async _createServer(app) { async _createServer(app) {
if (config.tls) { if (this._config.tls) {
return https.createServer(UtapiServer._createHttpsAgent(), app); return https.createServer(UtapiServer._createHttpsAgent(), app);
} }
return http.createServer(app); return http.createServer(app);
} }
static async _startServer(server) { async _startServer(server) {
moduleLogger moduleLogger
.with({ .with({
method: 'UtapiServer::_startServer', method: 'UtapiServer::_startServer',
cacheBackend: config.cacheBackend, cacheBackend: this._config.cacheBackend,
}) })
.info(`Server listening on ${config.port}`); .info(`Server listening on ${this._config.port}`);
await server.listen(config.port); await server.listen(this._config.port);
} }
async _setup() { async _setup() {
this._app = await UtapiServer._createApp(apiSpec); this._app = await UtapiServer._createApp(apiSpec);
this._server = await UtapiServer._createServer(this._app); this._server = await this._createServer(this._app);
} }
async _start() { async _start() {
await cacheClient.connect(); await cacheClient.connect();
await UtapiServer._startServer(this._server); await this._startServer(this._server);
} }
async _join() { async _join() {

View File

@ -1,20 +0,0 @@
const promClient = require('prom-client');
const httpRequestsTotal = new promClient.Counter({
name: 's3_utapi_http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['action', 'code'],
});
const httpRequestDurationSeconds = new promClient.Histogram({
name: 's3_utapi_http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['action', 'code'],
// buckets for response time from 0.1ms to 60s
buckets: [0.0001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0],
});
module.exports = {
httpRequestDurationSeconds,
httpRequestsTotal,
};

View File

@ -6,7 +6,6 @@ const config = require('../config');
const { logger, buildRequestLogger } = require('../utils'); const { logger, buildRequestLogger } = require('../utils');
const errors = require('../errors'); const errors = require('../errors');
const { translateAndAuthorize } = require('../vault'); const { translateAndAuthorize } = require('../vault');
const metricHandlers = require('./metrics');
const oasOptions = { const oasOptions = {
controllers: path.join(__dirname, './API/'), controllers: path.join(__dirname, './API/'),
@ -56,23 +55,6 @@ function responseLoggerMiddleware(req, res, next) {
} }
} }
function httpMetricsMiddleware(request, response, next) {
// If the request.ctx is undefined then this is an internal oasTools request (/_/docs)
// No metrics should be pushed
if (config.metrics.enabled && request.ctx && request.ctx.tag !== 'internal') {
metricHandlers.httpRequestsTotal
.labels({
action: request.ctx.operationId,
code: response.statusCode,
}).inc(1);
request.ctx.requestTimer({ code: response.statusCode });
}
if (next) {
next();
}
}
// next is purposely not called as all error responses are handled here // next is purposely not called as all error responses are handled here
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
function errorMiddleware(err, req, res, next) { function errorMiddleware(err, req, res, next) {
@ -100,7 +82,7 @@ function errorMiddleware(err, req, res, next) {
code, code,
message, message,
}); });
responseLoggerMiddleware(req, res, () => httpMetricsMiddleware(req, res)); responseLoggerMiddleware(req, res);
} }
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
@ -117,7 +99,7 @@ async function authV4Middleware(request, response, params) {
switch (request.ctx.operationId) { switch (request.ctx.operationId) {
case 'listMetrics': case 'listMetrics':
requestedResources = params.body[params.level]; requestedResources = params.body[params.level];
action = params.Action; action = params.Action.value;
break; break;
default: default:
@ -136,10 +118,6 @@ async function authV4Middleware(request, response, params) {
[passed, authorizedResources] = await translateAndAuthorize(request, action, params.level, requestedResources); [passed, authorizedResources] = await translateAndAuthorize(request, action, params.level, requestedResources);
} catch (error) { } catch (error) {
request.logger.error('error during authentication', { error }); request.logger.error('error during authentication', { error });
// rethrow any access denied errors
if ((error.is && error.is.AccessDenied) || (error.utapiError && error.AccessDenied)) {
throw error;
}
throw errors.InternalError; throw errors.InternalError;
} }
@ -176,6 +154,5 @@ module.exports = {
responseLoggerMiddleware, responseLoggerMiddleware,
authV4Middleware, authV4Middleware,
clientIpLimitMiddleware, clientIpLimitMiddleware,
httpMetricsMiddleware,
}, },
}; };

31
libV2/subsystem.js Normal file
View File

@ -0,0 +1,31 @@
const { EventEmitter } = require('events');
class SubSystem extends EventEmitter {
constructor(config) {
super();
this._config = config;
}
async setup() {
await this._setup(this._config);
}
async start() {
await this._start();
}
async join() {
this.emit('exit');
await this._join();
}
/* eslint-disable class-methods-use-this,no-empty-function */
async _setup() {}
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
}
module.exports = SubSystem;

View File

@ -1,12 +1,12 @@
const assert = require('assert'); const assert = require('assert');
const cron = require('node-schedule'); const cron = require('node-schedule');
const cronparser = require('cron-parser'); const cronparser = require('cron-parser');
const promClient = require('prom-client');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { client: cacheClient } = require('../cache'); const { client: cacheClient, buildCacheClient } = require('../cache');
const Process = require('../process'); const SubSystem = require('../subsystem');
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
const { LoggerContext, iterIfError } = require('../utils');
const { buildWarp10Clients } = require('../warp10');
const logger = new LoggerContext({ const logger = new LoggerContext({
module: 'BaseTask', module: 'BaseTask',
@ -14,123 +14,51 @@ const logger = new LoggerContext({
class Now {} class Now {}
class BaseTask extends Process { class BaseTask extends SubSystem {
constructor(options) { constructor(config) {
super(); super(config);
assert.notStrictEqual(options, undefined); // TODO construct cache client here rather than globally
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients'); this._cache = null;
this._cache = cacheClient; this._warp10Clients = null;
this._warp10Clients = options.warp10;
this._scheduler = null; this._scheduler = null;
this._defaultSchedule = Now; this._defaultSchedule = Now;
this._defaultLag = 0; this._defaultLag = 0;
this._enableMetrics = options.enableMetrics || false;
this._metricsHost = options.metricsHost || 'localhost';
this._metricsPort = options.metricsPort || 9001;
this._metricsHandlers = null;
this._probeServer = null;
} }
async _setup(includeDefaultOpts = true) { async _setup(config) {
if (includeDefaultOpts) { this._nodeId = config.nodeId;
this._program this._cache = buildCacheClient(config.cache);
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.') this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
.option( // if (includeDefaultOpts) {
'-s, --schedule <crontab>', // this._program
'Execute task using this crontab. Overrides configured schedule', // .option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
value => { // .option(
cronparser.parseExpression(value); // '-s, --schedule <crontab>',
return value; // 'Execute task using this crontab. Overrides configured schedule',
}, // value => {
) // cronparser.parseExpression(value);
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10)) // return value;
.option('-n, --node-id <id>', 'Set a custom node id'); // },
} // )
// .option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
if (this._enableMetrics) { // .option('-n, --node-id <id>', 'Set a custom node id');
promClient.collectDefaultMetrics({ // }
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
this._metricsHandlers = {
...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(),
};
await this._createProbeServer();
}
}
_registerDefaultMetricHandlers() {
const taskName = this.constructor.name;
// Get the name of our subclass in snake case format eg BaseClass => _base_class
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
const executionDuration = new promClient.Gauge({
name: `s3_utapi${taskNameSnake}_duration_seconds`,
help: `Execution time of the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionAttempts = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_attempts_total`,
help: `Total number of attempts to execute the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionFailures = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_failures_total`,
help: `Total number of failures executing the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
return {
executionDuration,
executionAttempts,
executionFailures,
};
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
return {};
}
async _createProbeServer() {
this._probeServer = await startProbeServer({
bindAddress: this._metricsHost,
port: this._metricsPort,
});
this._probeServer.addHandler(
DEFAULT_METRICS_ROUTE,
(res, log) => {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': promClient.register.contentType,
});
promClient.register.metrics().then(metrics => {
res.end(metrics);
});
},
);
} }
get schedule() { get schedule() {
if (this._program.now) { // if (this._program.now) {
return Now; // return Now;
} // }
if (this._program.schedule) { // if (this._program.schedule) {
return this._program.schedule; // return this._program.schedule;
} // }
return this._defaultSchedule; return this._defaultSchedule;
} }
get lag() { get lag() {
if (this._program.lag !== undefined) { // if (this._program.lag !== undefined) {
return this._program.lag; // return this._program.lag;
} // }
return this._defaultLag; return this._defaultLag;
} }
@ -155,23 +83,12 @@ class BaseTask extends Process {
} }
async execute() { async execute() {
let endTimer;
if (this._enableMetrics) {
endTimer = this._metricsHandlers.executionDuration.startTimer();
this._metricsHandlers.executionAttempts.inc(1);
}
try { try {
const timestamp = new Date() * 1000; // Timestamp in microseconds; const timestamp = new Date() * 1000; // Timestamp in microseconds;
const laggedTimestamp = timestamp - (this.lag * 1000000); const laggedTimestamp = timestamp - (this.lag * 1000000);
await this._execute(laggedTimestamp); await this._execute(laggedTimestamp);
} catch (error) { } catch (error) {
logger.error('Error during task execution', { error }); logger.error('Error during task execution', { error });
this._metricsHandlers.executionFailures.inc(1);
}
if (this._enableMetrics) {
endTimer();
} }
} }
@ -181,9 +98,6 @@ class BaseTask extends Process {
} }
async _join() { async _join() {
if (this._probeServer !== null) {
this._probeServer.stop();
}
return this._cache.disconnect(); return this._cache.disconnect();
} }

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const config = require('../config'); const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants'); const { checkpointLagSecs, indexedEventFields } = require('../constants');
@ -10,88 +9,11 @@ const logger = new LoggerContext({
class CreateCheckpoint extends BaseTask { class CreateCheckpoint extends BaseTask {
constructor(options) { constructor(options) {
super({ super(options);
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.checkpointPort,
...options,
});
this._defaultSchedule = config.checkpointSchedule; this._defaultSchedule = config.checkpointSchedule;
this._defaultLag = checkpointLagSecs; this._defaultLag = checkpointLagSecs;
} }
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_checkpoint_created_total',
help: 'Total number of checkpoints created',
labelNames: ['origin', 'containerName'],
});
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
const lastCheckpoint = new promClient.Gauge({
name: 's3_utapi_create_checkpoint_last_checkpoint_seconds',
help: 'Timestamp of the last successfully created checkpoint',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastCheckpoint();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastCheckpoint,
};
}
/**
* Metrics for CreateCheckpoint
* @typedef {Object} CreateCheckpointMetrics
* @property {number} created - Number of checkpoints created
*/
/**
*
* @param {CreateCheckpointMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _getLastCheckpoint() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.checkpoint.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (!resp.result || (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) { async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp }); logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => { const status = await this.withWarp10(async warp10 => {
@ -107,7 +29,6 @@ class CreateCheckpoint extends BaseTask {
}); });
if (status.result[0]) { if (status.result[0]) {
logger.info(`created ${status.result[0] || 0} checkpoints`); logger.info(`created ${status.result[0] || 0} checkpoints`);
this._pushMetrics({ created: status.result[0] });
} }
} }
} }

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const config = require('../config'); const config = require('../config');
const { snapshotLagSecs } = require('../constants'); const { snapshotLagSecs } = require('../constants');
@ -10,88 +9,11 @@ const logger = new LoggerContext({
class CreateSnapshot extends BaseTask { class CreateSnapshot extends BaseTask {
constructor(options) { constructor(options) {
super({ super(options);
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.snapshotPort,
...options,
});
this._defaultSchedule = config.snapshotSchedule; this._defaultSchedule = config.snapshotSchedule;
this._defaultLag = snapshotLagSecs; this._defaultLag = snapshotLagSecs;
} }
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_snapshot_created_total',
help: 'Total number of snapshots created',
labelNames: ['origin', 'containerName'],
});
const getLastSnapshot = this._getLastSnapshot.bind(this);
const lastSnapshot = new promClient.Gauge({
name: 's3_utapi_create_snapshot_last_snapshot_seconds',
help: 'Timestamp of the last successfully created snapshot',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastSnapshot();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastSnapshot,
};
}
/**
* Metrics for CreateSnapshot
* @typedef {Object} CreateSnapshotMetrics
* @property {number} created - Number of snapshots created
*/
/**
*
* @param {CreateSnapshotMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _getLastSnapshot() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (!resp.result || (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) { async _execute(timestamp) {
logger.debug('creating snapshots', { snapshotTimestamp: timestamp }); logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
@ -107,7 +29,6 @@ class CreateSnapshot extends BaseTask {
}); });
if (status.result[0]) { if (status.result[0]) {
logger.info(`created ${status.result[0]} snapshots`); logger.info(`created ${status.result[0]} snapshots`);
this._pushMetrics({ created: status.result[0] });
} }
} }
} }

View File

@ -1,7 +1,4 @@
const async = require('async'); const async = require('async');
const Path = require('path');
const fs = require('fs');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const config = require('../config'); const config = require('../config');
const { expirationChunkDuration } = require('../constants'); const { expirationChunkDuration } = require('../constants');
@ -19,13 +16,9 @@ const ACTION_THRESHOLD = 0.95;
class MonitorDiskUsage extends BaseTask { class MonitorDiskUsage extends BaseTask {
constructor(options) { constructor(options) {
super({ super(
enableMetrics: config.metrics.enabled, options,
metricsHost: config.metrics.host, );
metricsPort: config.metrics.diskUsagePort,
...options,
});
this._defaultSchedule = config.diskUsageSchedule; this._defaultSchedule = config.diskUsageSchedule;
this._defaultLag = 0; this._defaultLag = 0;
this._path = config.diskUsage.path; this._path = config.diskUsage.path;
@ -49,88 +42,6 @@ class MonitorDiskUsage extends BaseTask {
); );
} }
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const isLocked = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_is_locked',
help: 'Indicates whether the monitored warp 10 has had writes disabled',
labelNames: ['origin', 'containerName'],
});
const leveldbBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_leveldb_bytes',
help: 'Total bytes used by warp 10 leveldb',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10 datalog',
labelNames: ['origin', 'containerName'],
});
const hardLimitRatio = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_ratio',
help: 'Percent of the hard limit used by warp 10',
labelNames: ['origin', 'containerName'],
});
const hardLimitSetting = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_bytes',
help: 'The hard limit setting in bytes',
labelNames: ['origin', 'containerName'],
});
return {
isLocked,
leveldbBytes,
datalogBytes,
hardLimitRatio,
hardLimitSetting,
};
}
/**
* Metrics for MonitorDiskUsage
* @typedef {Object} MonitorDiskUsageMetrics
* @property {boolean} isLocked - Indicates if writes have been disabled for the monitored warp10
* @property {number} leveldbBytes - Total bytes used by warp 10 leveldb
* @property {number} datalogBytes - Total bytes used by warp 10 datalog
* @property {number} hardLimitRatio - Percent of the hard limit used by warp 10
* @property {number} hardLimitSetting - The hard limit setting in bytes
*/
/**
*
* @param {MonitorDiskUsageMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.isLocked !== undefined) {
this._metricsHandlers.isLocked.set(metrics.isLocked ? 1 : 0);
}
if (metrics.leveldbBytes !== undefined) {
this._metricsHandlers.leveldbBytes.set(metrics.leveldbBytes);
}
if (metrics.datalogBytes !== undefined) {
this._metricsHandlers.datalogBytes.set(metrics.datalogBytes);
}
if (metrics.hardLimitRatio !== undefined) {
this._metricsHandlers.hardLimitRatio.set(metrics.hardLimitRatio);
}
if (metrics.hardLimitSetting !== undefined) {
this._metricsHandlers.hardLimitSetting.set(metrics.hardLimitSetting);
}
}
get isLeader() { get isLeader() {
return this._program.leader !== undefined; return this._program.leader !== undefined;
} }
@ -143,13 +54,9 @@ class MonitorDiskUsage extends BaseTask {
return this._program.lock !== undefined; return this._program.lock !== undefined;
} }
// eslint-disable-next-line class-methods-use-this _getUsage() {
async _getUsage(path) { moduleLogger.debug(`calculating disk usage for ${this._path}`);
moduleLogger.debug(`calculating disk usage for ${path}`); return getFolderSize(this._path);
if (!fs.existsSync(path)) {
throw Error(`failed to calculate usage for non-existent path ${path}`);
}
return getFolderSize(path);
} }
async _expireMetrics(timestamp) { async _expireMetrics(timestamp) {
@ -195,7 +102,7 @@ class MonitorDiskUsage extends BaseTask {
} }
_checkHardLimit(size, nodeId) { _checkHardLimit(size, nodeId) {
const hardPercentage = parseFloat((size / this._hardLimit).toFixed(2)); const hardPercentage = (size / this._hardLimit).toFixed(2);
const hardLimitHuman = formatDiskSize(this._hardLimit); const hardLimitHuman = formatDiskSize(this._hardLimit);
const hardLogger = moduleLogger.with({ const hardLogger = moduleLogger.with({
size, size,
@ -206,8 +113,6 @@ class MonitorDiskUsage extends BaseTask {
nodeId, nodeId,
}); });
this._pushMetrics({ hardLimitRatio: hardPercentage });
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`; const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
if (hardPercentage < WARN_THRESHOLD) { if (hardPercentage < WARN_THRESHOLD) {
@ -245,14 +150,12 @@ class MonitorDiskUsage extends BaseTask {
if (this.isManualUnlock) { if (this.isManualUnlock) {
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId }); moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
await this._enableWarp10Updates(); await this._enableWarp10Updates();
this._pushMetrics({ isLocked: false });
return; return;
} }
if (this.isManualLock) { if (this.isManualLock) {
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId }); moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
await this._disableWarp10Updates(); await this._disableWarp10Updates();
this._pushMetrics({ isLocked: true });
return; return;
} }
@ -267,21 +170,16 @@ class MonitorDiskUsage extends BaseTask {
return; return;
} }
let leveldbBytes = null; let size = null;
let datalogBytes = null;
try { try {
leveldbBytes = await this._getUsage(Path.join(this._path, 'leveldb')); size = await this._getUsage();
datalogBytes = await this._getUsage(Path.join(this._path, 'datalog'));
} catch (error) { } catch (error) {
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error }); moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
return; return;
} }
this._pushMetrics({ leveldbBytes, datalogBytes });
const size = leveldbBytes + datalogBytes;
if (this._hardLimit !== null) { if (this._hardLimit !== null) {
moduleLogger.info(`warp 10 using ${formatDiskSize(size)} of disk space`, { leveldbBytes, datalogBytes }); moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
const shouldLock = this._checkHardLimit(size, this.nodeId); const shouldLock = this._checkHardLimit(size, this.nodeId);
if (shouldLock) { if (shouldLock) {
@ -292,7 +190,6 @@ class MonitorDiskUsage extends BaseTask {
{ nodeId: this.nodeId }); { nodeId: this.nodeId });
await this._enableWarp10Updates(); await this._enableWarp10Updates();
} }
this._pushMetrics({ isLocked: shouldLock, hardLimitSetting: this._hardLimit });
} }
} }
} }

View File

@ -1,9 +1,8 @@
const assert = require('assert'); const assert = require('assert');
const async = require('async'); const async = require('async');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models'); const { UtapiMetric } = require('../models');
const config = require('../config'); // const config = require('../config');
const { checkpointLagSecs } = require('../constants'); const { checkpointLagSecs } = require('../constants');
const { const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
@ -16,87 +15,15 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs); const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
class IngestShardTask extends BaseTask { class IngestShardTask extends BaseTask {
constructor(options) { constructor(config, stripEventUUID = true) {
super({ super(config);
enableMetrics: config.metrics.enabled, this._stripEventUUID = stripEventUUID;
metricsHost: config.metrics.host, }
metricsPort: config.metrics.ingestPort,
...options,
});
async _setup(config) {
await super._setup(config);
this._defaultSchedule = config.ingestionSchedule; this._defaultSchedule = config.ingestionSchedule;
this._defaultLag = config.ingestionLagSeconds; this._defaultLag = config.ingestionLagSeconds;
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const ingestedTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_ingest_total',
help: 'Total number of metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedSlow = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_slow_total',
help: 'Total number of slow metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedShards = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_ingest_total',
help: 'Total number of metric shards ingested',
labelNames: ['origin', 'containerName'],
});
const shardAgeTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_age_total',
help: 'Total aggregated age of shards',
labelNames: ['origin', 'containerName'],
});
return {
ingestedTotal,
ingestedSlow,
ingestedShards,
shardAgeTotal,
};
}
/**
* Metrics for IngestShardTask
* @typedef {Object} IngestShardMetrics
* @property {number} ingestedTotal - Number of events ingested
* @property {number} ingestedSlow - Number of slow events ingested
* @property {number} ingestedShards - Number of metric shards ingested
* @property {number} shardAgeTotal - Aggregated age of shards
*/
/**
*
* @param {IngestShardMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.ingestedTotal !== undefined) {
this._metricsHandlers.ingestedTotal.inc(metrics.ingestedTotal);
}
if (metrics.ingestedSlow !== undefined) {
this._metricsHandlers.ingestedSlow.inc(metrics.ingestedSlow);
}
if (metrics.ingestedShards !== undefined) {
this._metricsHandlers.ingestedShards.inc(metrics.ingestedShards);
}
if (metrics.shardAgeTotal !== undefined) {
this._metricsHandlers.shardAgeTotal.inc(metrics.shardAgeTotal);
}
} }
_hydrateEvent(data, stripTimestamp = false) { _hydrateEvent(data, stripTimestamp = false) {
@ -124,8 +51,6 @@ class IngestShardTask extends BaseTask {
return; return;
} }
let shardAgeTotal = 0;
let ingestedShards = 0;
await async.eachLimit(toIngest, 10, await async.eachLimit(toIngest, 10,
async shard => { async shard => {
if (await this._cache.shardExists(shard)) { if (await this._cache.shardExists(shard)) {
@ -156,20 +81,13 @@ class IngestShardTask extends BaseTask {
return warp10.ingest( return warp10.ingest(
{ {
className: metricClass, className: metricClass,
labels: { origin: config.nodeId }, labels: { origin: this._nodeId },
}, records, }, records,
); );
}); });
assert.strictEqual(status, records.length); assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard); await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`); logger.info(`ingested ${status} records from ${this._nodeId} into ${ingestedIntoNodeId}`);
shardAgeTotal += shardAge;
ingestedShards += 1;
this._pushMetrics({ ingestedTotal: records.length });
if (areSlowEvents) {
this._pushMetrics({ ingestedSlow: records.length });
}
} else { } else {
logger.debug('No events found in shard, cleaning up'); logger.debug('No events found in shard, cleaning up');
} }
@ -177,8 +95,6 @@ class IngestShardTask extends BaseTask {
logger.warn('shard does not exist', { shard }); logger.warn('shard does not exist', { shard });
} }
}); });
const shardAgeTotalSecs = shardAgeTotal / 1000000;
this._pushMetrics({ shardAgeTotal: shardAgeTotalSecs, ingestedShards });
} }
} }

View File

@ -7,12 +7,7 @@ const config = require('../config');
const metadata = require('../metadata'); const metadata = require('../metadata');
const { serviceToWarp10Label, warp10RecordType } = require('../constants'); const { serviceToWarp10Label, warp10RecordType } = require('../constants');
const { const { LoggerContext, convertTimestamp } = require('../utils');
LoggerContext,
logEventFilter,
convertTimestamp,
buildFilterChain,
} = require('../utils');
const logger = new LoggerContext({ const logger = new LoggerContext({
module: 'ReindexTask', module: 'ReindexTask',
@ -20,36 +15,9 @@ const logger = new LoggerContext({
class ReindexTask extends BaseTask { class ReindexTask extends BaseTask {
constructor(options) { constructor(options) {
super({ super(options);
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.reindexPort,
...options,
});
this._defaultSchedule = config.reindexSchedule; this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0; this._defaultLag = 0;
const eventFilters = (config && config.filter) || {};
this._shouldReindex = buildFilterChain((config && config.filter) || {});
// exponential backoff: max wait = 50 * 2 ^ 10 milliseconds ~= 51 seconds
this.ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => logger.info(...args), 'reindex resource filtering enabled', eventFilters);
}
}
async _setup(includeDefaultOpts = true) {
await super._setup(includeDefaultOpts);
this._program.option(
'--bucket <bucket>',
'Manually specify a bucket to reindex. Can be used multiple times.',
(bucket, previous) => previous.concat([bucket]),
[],
);
} }
static async _indexBucket(bucket) { static async _indexBucket(bucket) {
@ -63,19 +31,12 @@ class ReindexTask extends BaseTask {
// eslint-disable-next-line no-continue // eslint-disable-next-line no-continue
continue; continue;
} }
if (!Number.isInteger(obj.value['content-length'])) {
logger.debug('object missing content-length, not including in count');
// eslint-disable-next-line no-continue
continue;
}
count += 1; count += 1;
size += obj.value['content-length']; size += obj.value['content-length'];
// If versioned, subtract the size of the master to avoid double counting // If versioned, subtract the size of the master to avoid double counting
if (lastMaster && obj.name === lastMaster) { if (lastMaster && obj.name === lastMaster) {
logger.debug('Detected versioned key. subtracting master size', { lastMasterSize, key: obj.name }); logger.debug('Detected versioned key, subtracting master size', { lastMasterSize, key: obj.name });
size -= lastMasterSize; size -= lastMasterSize;
count -= 1; count -= 1;
lastMaster = null; lastMaster = null;
@ -103,43 +64,28 @@ class ReindexTask extends BaseTask {
const res = await this.withWarp10(warp10 => { const res = await this.withWarp10(warp10 => {
const options = { const options = {
params: { params: {
start: timestamp,
end: timestamp, end: timestamp,
node: warp10.nodeId, node: warp10.nodeId,
labels: { labels: {
[level]: resource, [level]: resource,
}, },
// eslint-disable-next-line camelcase
no_reindex: true,
}, },
macro: 'utapi/getMetricsAt', macro: 'utapi/getMetrics',
}; };
return warp10.exec(options); return warp10.exec(options);
}); });
return { timestamp, value: JSON.parse(res.result[0]) };
const [value] = res.result || [];
if (!value) {
throw new Error('unable to fetch current metrics from warp10');
}
if (!Number.isInteger(value.objD) || !Number.isInteger(value.sizeD)) {
logger.error('invalid values returned from warp 10', { response: res });
throw new Error('invalid values returned from warp 10');
}
return {
timestamp,
value,
};
} }
async _updateMetric(level, resource, total) { async _updateMetric(level, resource, total) {
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource); const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
const objectDelta = total.count - value.objD; const objectDelta = total.count - value.numberOfObjects[0];
const sizeDelta = total.size - value.sizeD; const sizeDelta = total.size - value.storageUtilized[0];
if (objectDelta !== 0 || sizeDelta !== 0) { if (objectDelta !== 0 || sizeDelta !== 0) {
logger.info('discrepancy detected in metrics. writing corrective record', logger.info('discrepancy detected in metrics, writing corrective record',
{ [level]: resource, objectDelta, sizeDelta }); { [level]: resource, objectDelta, sizeDelta });
const record = new UtapiRecord({ const record = new UtapiRecord({
@ -160,42 +106,25 @@ class ReindexTask extends BaseTask {
} }
} }
get targetBuckets() {
if (this._program.bucket.length) {
return this._program.bucket.map(name => ({ name }));
}
return metadata.listBuckets();
}
async _execute() { async _execute() {
logger.info('started reindex task'); logger.debug('reindexing objects');
const accountTotals = {}; const accountTotals = {};
const ignoredAccounts = new Set(); const ignoredAccounts = new Set();
await async.eachLimit(this.targetBuckets, 5, async bucket => {
if (!this._shouldReindex({ bucket: bucket.name, account: bucket.account })) {
logger.debug('skipping excluded bucket', { bucket: bucket.name, account: bucket.account });
return;
}
logger.info('started bucket reindex', { bucket: bucket.name }); await async.eachLimit(metadata.listBuckets(), 5, async bucket => {
logger.trace('starting reindex of bucket', { bucket: bucket.name });
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`; const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
let bktTotal; let bktTotal;
let mpuTotal; let mpuTotal;
try { try {
bktTotal = await async.retryable(this.ebConfig, ReindexTask._indexBucket)(bucket.name); bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(this.ebConfig, ReindexTask._indexMpuBucket)(mpuBucket); mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) { } catch (error) {
logger.error( logger.error('failed to reindex bucket, ignoring associated account', { error, bucket: bucket.name });
'failed bucket reindex. any associated account will be skipped',
{ error, bucket: bucket.name },
);
// buckets passed with `--bucket` won't have an account property
if (bucket.account) {
ignoredAccounts.add(bucket.account); ignoredAccounts.add(bucket.account);
}
return; return;
} }
@ -204,45 +133,33 @@ class ReindexTask extends BaseTask {
count: bktTotal.count, count: bktTotal.count,
}; };
// buckets passed with `--bucket` won't have an account property
if (bucket.account) {
if (accountTotals[bucket.account]) { if (accountTotals[bucket.account]) {
accountTotals[bucket.account].size += total.size; accountTotals[bucket.account].size += total.size;
accountTotals[bucket.account].count += total.count; accountTotals[bucket.account].count += total.count;
} else { } else {
accountTotals[bucket.account] = { ...total }; accountTotals[bucket.account] = { ...total };
} }
}
logger.info('finished bucket reindex', { bucket: bucket.name }); logger.trace('finished indexing bucket', { bucket: bucket.name });
try {
await this._updateMetric( await this._updateMetric(
serviceToWarp10Label.buckets, serviceToWarp10Label.buckets,
bucket.name, bucket.name,
total, total,
); );
} catch (error) {
logger.error('error updating metrics for bucket', { error, bucket: bucket.name });
}
}); });
const toUpdate = Object.entries(accountTotals) const toUpdate = Object.entries(accountTotals)
.filter(([account]) => !ignoredAccounts.has(account)); .filter(([account]) => !ignoredAccounts.has(account));
await async.eachLimit(toUpdate, 5, async ([account, total]) => { await async.eachLimit(toUpdate, 5, async ([account, total]) =>
try { this._updateMetric(
await this._updateMetric(
serviceToWarp10Label.accounts, serviceToWarp10Label.accounts,
account, account,
total, total,
); ));
} catch (error) {
logger.error('error updating metrics for account', { error, account });
}
});
logger.info('finished reindex task'); logger.debug('finished reindexing');
} }
} }

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const config = require('../config'); const config = require('../config');
const { LoggerContext } = require('../utils'); const { LoggerContext } = require('../utils');
@ -10,51 +9,11 @@ const logger = new LoggerContext({
class RepairTask extends BaseTask { class RepairTask extends BaseTask {
constructor(options) { constructor(options) {
super({ super(options);
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.repairPort,
...options,
});
this._defaultSchedule = config.repairSchedule; this._defaultSchedule = config.repairSchedule;
this._defaultLag = repairLagSecs; this._defaultLag = repairLagSecs;
} }
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_repair_task_created_total',
help: 'Total number of repair records created',
labelNames: ['origin', 'containerName'],
});
return {
created,
};
}
/**
* Metrics for RepairTask
* @typedef {Object} RepairMetrics
* @property {number} created - Number of repair records created
*/
/**
*
* @param {RepairMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _execute(timestamp) { async _execute(timestamp) {
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId }); logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
@ -71,7 +30,6 @@ class RepairTask extends BaseTask {
}); });
if (status.result[0]) { if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`); logger.info(`created ${status.result[0]} corrections`);
this._pushMetrics({ created: status.result[0] });
} }
} }
} }

View File

@ -1,47 +0,0 @@
const assert = require('assert');
/**
* filterObject
*
* Constructs a function meant for filtering Objects by the value of a key
* Returned function returns a boolean with false meaning the object was present
* in the filter allowing the function to be passed directly to Array.filter etc.
*
* @param {string} key - Object key to inspect
* @param {Object} filter
* @param {Set} [filter.allow] - Set containing keys to include
* @param {Set} [filter.deny] - Set containing keys to not include
* @returns {function(Object): bool}
*/
function filterObject(obj, key, { allow, deny }) {
if (allow && deny) {
throw new Error('You can not define both an allow and a deny list.');
}
if (!allow && !deny) {
throw new Error('You must define either an allow or a deny list.');
}
if (allow) {
assert(allow instanceof Set);
return obj[key] === undefined || allow.has(obj[key]);
}
assert(deny instanceof Set);
return obj[key] === undefined || !deny.has(obj[key]);
}
/**
* buildFilterChain
*
* Constructs a function from a map of key names and allow/deny filters.
* The returned function returns a boolean with false meaning the object was present
* in one of the filters allowing the function to be passed directly to Array.filter etc.
*
* @param {Object<string, Object<string, Set>} filters
* @returns {function(Object): bool}
*/
function buildFilterChain(filters) {
return obj => Object.entries(filters).every(([key, filter]) => filterObject(obj, key, filter));
}
module.exports = { filterObject, buildFilterChain };

View File

@ -42,21 +42,19 @@ function comprehend(data, func) {
* @returns {*} - * @returns {*} -
*/ */
async function iterIfError(items, func, onError) { async function iterIfError(items, func, onError) {
let error;
// eslint-disable-next-line no-restricted-syntax // eslint-disable-next-line no-restricted-syntax
for (const item of items) { for (const item of items) {
try { try {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
const resp = await func(item); const resp = await func(item);
return resp; return resp;
} catch (_error) { } catch (error) {
if (onError) { if (onError) {
onError(_error); onError(error);
}
error = _error;
} }
} }
throw error || new Error('unable to complete request'); }
throw new Error('unable to complete request');
} }
module.exports = { module.exports = {

View File

@ -3,8 +3,6 @@ const shard = require('./shard');
const timestamp = require('./timestamp'); const timestamp = require('./timestamp');
const func = require('./func'); const func = require('./func');
const disk = require('./disk'); const disk = require('./disk');
const filter = require('./filter');
const probe = require('./probe');
module.exports = { module.exports = {
...log, ...log,
@ -12,6 +10,4 @@ module.exports = {
...timestamp, ...timestamp,
...func, ...func,
...disk, ...disk,
...filter,
...probe,
}; };

View File

@ -1,5 +1,12 @@
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const { comprehend } = require('./func'); const config = require('../config');
const loggerConfig = {
level: config.logging.level,
dump: config.logging.dumpLevel,
};
werelogs.configure(loggerConfig);
const rootLogger = new werelogs.Logger('Utapi'); const rootLogger = new werelogs.Logger('Utapi');
@ -70,6 +77,8 @@ class LoggerContext {
} }
} }
rootLogger.debug('logger initialized', { loggerConfig });
function buildRequestLogger(req) { function buildRequestLogger(req) {
let reqUids = []; let reqUids = [];
if (req.headers['x-scal-request-uids'] !== undefined) { if (req.headers['x-scal-request-uids'] !== undefined) {
@ -93,26 +102,8 @@ function buildRequestLogger(req) {
return new LoggerContext({}, reqLogger); return new LoggerContext({}, reqLogger);
} }
function logEventFilter(logger, msg, eventFilters) {
const filterLog = comprehend(
eventFilters,
(level, rules) => ({
key: level,
value: comprehend(
rules,
(rule, values) => ({
key: rule,
value: Array.from(values),
}),
),
}),
);
logger(msg, { filters: filterLog });
}
module.exports = { module.exports = {
logger: rootLogger, logger: rootLogger,
buildRequestLogger, buildRequestLogger,
LoggerContext, LoggerContext,
logEventFilter,
}; };

View File

@ -1,32 +0,0 @@
const { ProbeServer } = require('arsenal').network.probe.ProbeServer;
/**
* Configure probe servers
* @typedef {Object} ProbeServerConfig
* @property {string} bindAddress - Address to bind probe server to
* @property {number} port - Port to bind probe server to
*/
/**
* Start an empty probe server
* @async
* @param {ProbeServerConfig} config - Configuration for probe server
* @returns {Promise<ProbeServer>} - Instance of ProbeServer
*/
async function startProbeServer(config) {
if (!config) {
throw new Error('configuration for probe server is missing');
}
return new Promise((resolve, reject) => {
const probeServer = new ProbeServer(config);
probeServer.onListening(() => resolve(probeServer));
probeServer.onError(err => reject(err));
probeServer.start();
});
}
module.exports = {
startProbeServer,
};

179
libV2/vault.js Normal file
View File

@ -0,0 +1,179 @@
const assert = require('assert');
const { auth, policies } = require('arsenal');
const vaultclient = require('vaultclient');
const config = require('./config');
const errors = require('./errors');
/**
@class Vault
* Creates a vault instance for authentication and authorization
*/
class Vault {
constructor(options) {
const { host, port } = options.vaultd;
if (options.tls) {
const { key, cert, ca } = options.tls;
this._client = new vaultclient.Client(host, port, true, key, cert,
ca);
} else {
this._client = new vaultclient.Client(host, port);
}
}
/** authenticateV4Request
*
* @param {object} params - the authentication parameters as returned by
* auth.extractParams
* @param {number} params.version - shall equal 4
* @param {string} params.data.accessKey - the user's accessKey
* @param {string} params.data.signatureFromRequest - the signature read from
* the request
* @param {string} params.data.region - the AWS region
* @param {string} params.data.stringToSign - the stringToSign
* @param {string} params.data.scopeDate - the timespan to allow the request
* @param {string} params.data.authType - the type of authentication
* (query or header)
* @param {string} params.data.signatureVersion - the version of the
* signature (AWS or AWS4)
* @param {number} params.data.signatureAge - the age of the signature in ms
* @param {string} params.data.log - the logger object
* @param {RequestContext []} requestContexts - an array of
* RequestContext instances which contain information
* for policy authorization check
* @param {function} callback - cb(err)
* @return {undefined}
*/
authenticateV4Request(params, requestContexts, callback) {
const {
accessKey, signatureFromRequest, region, scopeDate,
stringToSign,
} = params.data;
const { log } = params;
log.debug('authenticating V4 request');
const serializedRCs = requestContexts.map(rc => rc.serialize());
this._client.verifySignatureV4(
stringToSign, signatureFromRequest,
accessKey, region, scopeDate,
{ reqUid: log.getSerializedUids(), requestContext: serializedRCs },
(err, authInfo) => {
if (err) {
log.trace('error from vault', { error: err });
return callback(err);
}
return callback(null,
authInfo.message.body.authorizationResults);
},
);
}
/**
* Returns canonical Ids for a given list of account Ids
* @param {string[]} accountIds - list of account ids
* @param {object} log - Werelogs request logger
* @return {Promise} -
*/
getCanonicalIds(accountIds, log) {
log.debug('retrieving canonical ids for account ids', {
method: 'Vault.getCanonicalIds',
});
return new Promise((resolve, reject) =>
this._client.getCanonicalIdsByAccountIds(accountIds,
{ reqUid: log.getSerializedUids(), logger: log }, (err, res) => {
if (err) {
reject(err);
return;
}
if (!res.message || !res.message.body) {
reject(errors.InternalError);
return;
}
resolve(res.message.body.map(acc => ({
resource: acc.accountId,
id: acc.canonicalId,
})));
}));
}
}
const vault = new Vault(config);
auth.setHandler(vault);
async function translateResourceIds(level, resources, log) {
if (level === 'accounts') {
return vault.getCanonicalIds(resources, log);
}
return resources.map(resource => ({ resource, id: resource }));
}
async function authenticateRequest(request, action, level, resources) {
const policyContext = new policies.RequestContext(
request.headers,
request.query,
level,
resources,
request.ip,
request.ctx.encrypted,
action,
'utapi',
);
return new Promise((resolve, reject) => {
auth.server.doAuth(request, request.logger.logger, (err, res) => {
if (err && (err.InvalidAccessKeyId || err.AccessDenied)) {
resolve([false]);
return;
}
if (err) {
reject(err);
return;
}
// Will only have res if request is from a user rather than an account
let authorizedResources = resources;
if (res) {
try {
authorizedResources = res.reduce(
(authed, result) => {
if (result.isAllowed) {
// result.arn should be of format:
// arn:scality:utapi:::resourcetype/resource
assert(typeof result.arn === 'string');
assert(result.arn.indexOf('/') > -1);
const resource = result.arn.split('/')[1];
authed.push(resource);
request.logger.trace('access granted for resource', { resource });
}
return authed;
}, [],
);
} catch (err) {
reject(err);
}
} else {
request.logger.trace('granted access to all resources');
}
resolve([
authorizedResources.length !== 0,
authorizedResources,
]);
}, 's3', [policyContext]);
});
}
async function translateAndAuthorize(request, action, level, resources) {
const [authed, authorizedResources] = await authenticateRequest(request, action, level, resources);
if (!authed) {
return [authed];
}
const translated = await translateResourceIds(level, authorizedResources, request.logger.logger);
return [authed, translated];
}
module.exports = {
authenticateRequest,
translateAndAuthorize,
Vault,
vault,
};

View File

@ -1,135 +0,0 @@
const assert = require('assert');
const { auth, policies } = require('arsenal');
const config = require('../config');
const errors = require('../errors');
/**
@class Vault
* Creates a vault instance for authentication and authorization
*/
class VaultWrapper extends auth.Vault {
create(config) {
if (config.vaultd.host) {
return new VaultWrapper(config);
}
return null;
}
constructor(options) {
let client;
const { host, port } = options.vaultd;
const vaultclient = require('vaultclient');
if (options.tls) {
const { key, cert, ca } = options.tls;
client = new vaultclient.Client(host, port, true, key, cert,
ca);
} else {
client = new vaultclient.Client(host, port);
}
super(client, 'vault');
}
/**
* Returns canonical Ids for a given list of account Ids
* @param {string[]} accountIds - list of account ids
* @param {object} log - Werelogs request logger
* @return {Promise} -
*/
getCanonicalIds(accountIds, log) {
log.debug('retrieving canonical ids for account ids', {
method: 'Vault.getCanonicalIds',
accountIds,
});
return new Promise((resolve, reject) =>
this.client.getCanonicalIdsByAccountIds(accountIds,
{ reqUid: log.getSerializedUids(), logger: log }, (err, res) => {
if (err) {
reject(err);
return;
}
if (!res.message || !res.message.body) {
reject(errors.InternalError);
return;
}
resolve(res.message.body.map(acc => ({
resource: acc.accountId,
id: acc.canonicalId,
})));
}));
}
// eslint-disable-next-line class-methods-use-this
authenticateRequest(request, action, level, resources) {
const policyContext = new policies.RequestContext(
request.headers,
request.query,
level,
resources,
request.ip,
request.ctx.encrypted,
action,
'utapi',
);
return new Promise((resolve, reject) => {
auth.server.doAuth(
request,
request.logger.logger,
(err, authInfo, authRes) => {
if (err && err.is && (err.is.InvalidAccessKeyId || err.is.AccessDenied)) {
resolve({ authed: false });
return;
}
if (err) {
reject(err);
return;
}
// Only IAM users will return authorizedResources
let authorizedResources = resources;
if (authRes) {
authorizedResources = authRes
.filter(resource => resource.isAllowed)
.map(resource => {
// resource.arn should be of format:
// arn:scality:utapi:::resourcetype/resource
assert(typeof resource.arn === 'string');
assert(resource.arn.indexOf('/') > -1);
return resource.arn.split('/')[1];
});
}
resolve({ authed: true, authInfo, authorizedResources });
}, 's3', [policyContext],
);
});
}
getUsersById(userIds, log) {
log.debug('retrieving user arns for user ids', {
method: 'Vault.getUsersById',
userIds,
});
return new Promise((resolve, reject) =>
this.client.getUsersById(userIds,
{ reqUid: log.getSerializedUids(), logger: log }, (err, res) => {
if (err) {
reject(err);
return;
}
if (!res.message || !res.message.body) {
reject(errors.InternalError);
return;
}
resolve(res.message.body);
}));
}
}
const vault = VaultWrapper.create(config);
auth.setHandler(vault);
module.exports = {
VaultWrapper,
vault,
};

View File

@ -1,177 +0,0 @@
const { vault } = require('./client');
const metadata = require('../metadata');
const errors = require('../errors');
const config = require('../config');
async function authorizeAccountAccessKey(authInfo, level, resources, log) {
let authed = false;
let authedRes = [];
log.trace('Authorizing account', { resources });
switch (level) {
// Account keys can only query metrics their own account metrics
// So we can short circuit the auth to ->
// Did they request their account? Then authorize ONLY their account
case 'accounts':
authed = resources.some(r => r === authInfo.getShortid());
authedRes = authed ? [{ resource: authInfo.getShortid(), id: authInfo.getCanonicalID() }] : [];
break;
// Account keys are allowed access to any of their child users metrics
case 'users': {
let users;
try {
users = await vault.getUsersById(resources, log.logger);
} catch (error) {
log.error('failed to fetch user', { error });
throw errors.AccessDenied;
}
authedRes = users
.filter(user => user.parentId === authInfo.getShortid())
.map(user => ({ resource: user.id, id: user.id }));
authed = authedRes.length !== 0;
break;
}
// Accounts are only allowed access if they are the owner of the bucket
case 'buckets': {
const buckets = await Promise.all(
resources.map(async bucket => {
try {
const bucketMD = await metadata.getBucket(bucket);
return bucketMD;
} catch (error) {
log.error('failed to fetch metadata for bucket', { error, bucket });
throw errors.AccessDenied;
}
}),
);
authedRes = buckets
.filter(bucket => bucket.getOwner() === authInfo.getCanonicalID())
.map(bucket => ({ resource: bucket.getName(), id: bucket.getName() }));
authed = authedRes.length !== 0;
break;
}
// Accounts can not access service resources
case 'services':
break;
default:
log.error('Unknown metric level', { level });
throw new Error(`Unknown metric level ${level}`);
}
return [authed, authedRes];
}
async function authorizeUserAccessKey(authInfo, level, resources, log) {
let authed = false;
let authedRes = [];
log.trace('Authorizing IAM user', { resources });
// If no resources were authorized by Vault then no further checking is required
if (resources.length === 0) {
return [false, []];
}
// Get the parent account id from the user's arn
const parentAccountId = authInfo.getArn().split(':')[4];
// All users require an attached policy to query metrics
// Additional filtering is performed here to limit access to the user's account
switch (level) {
// User keys can only query metrics their own account metrics
// So we can short circuit the auth to ->
// Did they request their account? Then authorize ONLY their account
case 'accounts': {
authed = resources.some(r => r === parentAccountId);
authedRes = authed ? [{ resource: parentAccountId, id: authInfo.getCanonicalID() }] : [];
break;
}
// Users can query other user's metrics if they are under the same account
case 'users': {
let users;
try {
users = await vault.getUsersById(resources, log.logger);
} catch (error) {
log.error('failed to fetch user', { error });
throw errors.AccessDenied;
}
authedRes = users
.filter(user => user.parentId === parentAccountId)
.map(user => ({ resource: user.id, id: user.id }));
authed = authedRes.length !== 0;
break;
}
// Users can query bucket metrics if they are owned by the same account
case 'buckets': {
let buckets;
try {
buckets = await Promise.all(
resources.map(bucket => metadata.getBucket(bucket)),
);
} catch (error) {
log.error('failed to fetch metadata for bucket', { error });
throw error;
}
authedRes = buckets
.filter(bucket => bucket.getOwner() === authInfo.getCanonicalID())
.map(bucket => ({ resource: bucket.getName(), id: bucket.getName() }));
authed = authedRes.length !== 0;
break;
}
case 'services':
break;
default:
log.error('Unknown metric level', { level });
throw new Error(`Unknown metric level ${level}`);
}
return [authed, authedRes];
}
async function authorizeServiceUser(authInfo, level, resources, log) {
log.trace('Authorizing service user', { resources, arn: authInfo.getArn() });
// The service user is allowed access to any resource so no checking is done
if (level === 'accounts') {
const canonicalIds = await vault.getCanonicalIds(resources, log.logger);
return [canonicalIds.length !== 0, canonicalIds];
}
return [resources.length !== 0, resources.map(resource => ({ resource, id: resource }))];
}
async function translateAndAuthorize(request, action, level, resources) {
const {
authed,
authInfo,
authorizedResources,
} = await vault.authenticateRequest(request, action, level, resources);
if (!authed) {
return [false, []];
}
if (config.serviceUser.enabled && authInfo.getArn() === config.serviceUser.arn) {
return authorizeServiceUser(authInfo, level, authorizedResources, request.logger);
}
if (authInfo.isRequesterAnIAMUser()) {
return authorizeUserAccessKey(authInfo, level, authorizedResources, request.logger);
}
return authorizeAccountAccessKey(authInfo, level, authorizedResources, request.logger);
}
module.exports = {
translateAndAuthorize,
vault,
};

View File

@ -3,11 +3,6 @@ const needle = require('needle');
const assert = require('assert'); const assert = require('assert');
const { eventFieldsToWarp10, warp10EventType } = require('./constants'); const { eventFieldsToWarp10, warp10EventType } = require('./constants');
const _config = require('./config'); const _config = require('./config');
const { LoggerContext } = require('./utils');
const moduleLogger = new LoggerContext({
module: 'warp10',
});
class Warp10Client { class Warp10Client {
constructor(config) { constructor(config) {
@ -79,7 +74,6 @@ class Warp10Client {
async exec(params) { async exec(params) {
const payload = this._buildExecPayload(params); const payload = this._buildExecPayload(params);
const resp = await this._client.exec(payload); const resp = await this._client.exec(payload);
moduleLogger.info('warpscript executed', { stats: resp.meta });
return resp; return resp;
} }
@ -125,17 +119,22 @@ class Warp10Client {
} }
} }
const clients = _config.warp10.hosts.map( function buildWarp10Clients(hosts) {
return hosts.map(
val => new Warp10Client({ val => new Warp10Client({
readToken: _config.warp10.readToken, readToken: _config.warp10.readToken,
writeToken: _config.warp10.writeToken, writeToken: _config.warp10.writeToken,
connectTimeout: _config.warp10.connectTimeout,
requestTimeout: _config.warp10.requestTimeout,
...val, ...val,
}), }),
); );
}
// TODO Remove after all users have been moved to building their own clients
const clients = buildWarp10Clients(_config.warp10.hosts);
module.exports = { module.exports = {
Warp10Client, Warp10Client,
clients, clients,
buildWarp10Clients,
}; };

View File

@ -46,9 +46,6 @@
"s3:PutObjectRetention": 0, "s3:PutObjectRetention": 0,
"s3:GetObjectRetention": 0, "s3:GetObjectRetention": 0,
"s3:PutObjectLegalHold": 0, "s3:PutObjectLegalHold": 0,
"s3:GetObjectLegalHold": 0, "s3:GetObjectLegalHold": 0
"s3:ReplicateObject": 0,
"s3:ReplicateTags": 0,
"s3:ReplicateDelete": 0
} }
} }

View File

@ -101,12 +101,6 @@ components:
type: string type: string
level: level:
type: string type: string
utapi-get-prometheus-metrics:
description: metrics to be ingested by prometheus
content:
text/plain:
schema:
type: string
parameters: parameters:
level: level:
in: path in: path
@ -139,16 +133,6 @@ paths:
$ref: '#/components/responses/json-error' $ref: '#/components/responses/json-error'
200: 200:
description: Service is healthy description: Service is healthy
/_/metrics:
get:
x-router-controller: internal
x-iplimit: true
operationId: prometheusMetrics
responses:
default:
$ref: '#/components/responses/json-error'
200:
$ref: '#/components/responses/utapi-get-prometheus-metrics'
/v2/ingest: /v2/ingest:
post: post:
x-router-controller: metrics x-router-controller: metrics

View File

@ -1,9 +1,9 @@
{ {
"name": "utapi", "name": "utapi",
"engines": { "engines": {
"node": ">=16" "node": ">=10.19.0"
}, },
"version": "8.1.15", "version": "7.10.0",
"description": "API for tracking resource utilization and reporting metrics", "description": "API for tracking resource utilization and reporting metrics",
"main": "index.js", "main": "index.js",
"repository": { "repository": {
@ -19,44 +19,37 @@
"dependencies": { "dependencies": {
"@hapi/joi": "^17.1.1", "@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.14", "@senx/warp10": "^1.0.14",
"arsenal": "git+https://git.yourcmc.ru/vitalif/zenko-arsenal.git#development/8.1", "arsenal": "scality/Arsenal#65966f5",
"async": "^3.2.0", "async": "^3.2.0",
"aws-sdk": "^2.1005.0",
"aws4": "^1.8.0", "aws4": "^1.8.0",
"backo": "^1.1.0",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient",
"byte-size": "^7.0.0", "byte-size": "^7.0.0",
"commander": "^5.1.0", "commander": "^5.1.0",
"cron-parser": "^2.15.0", "cron-parser": "^2.15.0",
"diskusage": "^1.1.3", "diskusage": "^1.1.3",
"express": "^4.17.1", "express": "^4.17.1",
"get-folder-size": "^2.0.1", "get-folder-size": "^2.0.1",
"ioredis": "^4.28.0", "ioredis": "^4.9.5",
"js-yaml": "^3.14.0", "js-yaml": "^3.14.0",
"level-mem": "^5.0.1", "level-mem": "^5.0.1",
"needle": "^2.5.0", "needle": "^2.5.0",
"node-schedule": "^1.3.2", "node-schedule": "^1.3.2",
"oas-tools": "^2.2.2", "oas-tools": "^2.1.8",
"prom-client": "14.2.0",
"uuid": "^3.3.2", "uuid": "^3.3.2",
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1" "vaultclient": "scality/vaultclient#ff9e92f",
"werelogs": "scality/werelogs#0a4c576"
}, },
"devDependencies": { "devDependencies": {
"eslint": "^8.14.0", "eslint": "^5.12.0",
"eslint-config-airbnb-base": "^15.0.0", "eslint-config-airbnb-base": "13.1.0",
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git", "eslint-config-scality": "scality/Guidelines#7cc5ff1",
"eslint-plugin-import": "^2.18.0", "eslint-plugin-import": "2.14.0",
"mocha": ">=3.1.2", "mocha": "^3.0.2",
"nodemon": "^2.0.4", "nodemon": "^2.0.4",
"protobufjs": "^6.10.1", "protobufjs": "^6.10.1",
"sinon": "^9.0.2" "sinon": "^9.0.2"
}, },
"resolutions": {
"**/@yarnpkg/fslib": "2.4.0",
"**/@yarnpkg/libzip": "2.2.1",
"**/@yarnpkg/json-proxy": "2.1.0",
"**/@yarnpkg/parsers": "2.3.0"
},
"scripts": { "scripts": {
"ft_test": "mocha --recursive tests/functional", "ft_test": "mocha --recursive tests/functional",
"ft_test:client": "mocha --recursive tests/functional/client", "ft_test:client": "mocha --recursive tests/functional/client",

View File

@ -1,2 +0,0 @@
redis==5.0.3
requests==2.31.0

View File

@ -3,16 +3,17 @@ const assert = require('assert');
const url = require('url'); const url = require('url');
const { auth, errors, policies } = require('arsenal'); const { auth, errors, policies } = require('arsenal');
const safeJsonParse = require('../utils/safeJsonParse'); const safeJsonParse = require('../utils/safeJsonParse');
const Vault = require('../lib/Vault');
class Router { class Router {
/** /**
* @constructor * @constructor
* @param {Config} config - Config instance * @param {Config} config - Config instance
*/ */
constructor(config, vault) { constructor(config) {
this._service = config.component; this._service = config.component;
this._routes = {}; this._routes = {};
this._vault = vault; this._vault = new Vault(config);
} }
/** /**
@ -265,10 +266,6 @@ class Router {
*/ */
_processSecurityChecks(utapiRequest, route, cb) { _processSecurityChecks(utapiRequest, route, cb) {
const log = utapiRequest.getLog(); const log = utapiRequest.getLog();
if (process.env.UTAPI_AUTH === 'false') {
// Zenko route request does not need to go through Vault
return this._startRequest(utapiRequest, route, cb);
}
return this._authSquared(utapiRequest, err => { return this._authSquared(utapiRequest, err => {
if (err) { if (err) {
log.trace('error from vault', { errors: err }); log.trace('error from vault', { errors: err });

View File

@ -1,21 +1,4 @@
const fs = require('fs'); const config = require('./lib/Config');
const path = require('path');
const Config = require('./lib/Config');
const server = require('./lib/server'); const server = require('./lib/server');
/* server(Object.assign({}, config, { component: 's3' }));
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
const cfgpath = process.env.UTAPI_CONFIG_FILE || (__dirname+'/config.json');
let cfg;
try {
cfg = JSON.parse(fs.readFileSync(cfgpath, { encoding: 'utf-8' }));
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
cfg.component = 's3';
server(new Config(cfg));

View File

@ -1,141 +0,0 @@
const assert = require('assert');
const async = require('async');
const Redis = require('ioredis');
const { EventEmitter } = require('events');
const { makeUtapiGenericClientRequest } = require('../../utils/utils');
const Vault = require('../../utils/mock/Vault');
const host = '127.0.0.1';
const sentinels = [
{ host, port: 16379 },
];
const redis = new Redis({
port: 6379,
host,
sentinels,
name: 'scality-s3',
});
const sentinel = new Redis({
port: 16379,
host,
});
const sentinelSub = new Redis({
port: 16379,
host,
});
describe('Client connections', async function test() {
this.timeout(5000);
this.loadgen = new EventEmitter();
const makeRequest = (ctx, done) => {
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
const {
timeRange, type, resource, expected, action,
} = {
action: 'ListMetrics',
type: 'buckets',
resource: 'my-bucket',
timeRange: [
0,
((MAX_RANGE_MS / 60) - (1000 * 60) * 15) - 1,
],
expected: {
bucketName: 'my-bucket',
},
};
const headers = {
method: 'POST',
path: `/${type}?Action=${action}`,
};
const body = {
timeRange,
[type]: [resource],
};
ctx.requestsDuringFailover += 1;
makeUtapiGenericClientRequest(headers, body, (err, response) => {
if (err) {
return done(err);
}
const data = JSON.parse(response);
if (data.code) {
return new Error(data.message);
}
if (timeRange) {
assert.deepStrictEqual(timeRange, data[0].timeRange);
}
Object.entries(expected).forEach(([k, v]) => {
assert.strictEqual(data[0][k], v);
});
});
};
before(async () => {
await sentinelSub.subscribe('+slave');
this.connectionsBeforeFailover = null;
this.connectionsAfterFailover = null;
this.requestsDuringFailover = 0;
this.vault = new Vault();
this.vault.start();
});
after(async () => {
this.vault.end();
redis.disconnect();
sentinel.disconnect();
sentinelSub.disconnect();
});
beforeEach(async () => {
const clients = await redis.client('list');
this.connectionsBeforeFailover = clients.split('\n').length;
this.requestsDuringFailover = 0;
});
afterEach(async () => {
const clients = await redis.client('list');
this.connectionsAfterFailover = clients.split('\n').length;
assert(this.connectionsAfterFailover <= this.connectionsBeforeFailover);
});
it('should not add connections after failover under load', done => {
sentinel.sentinel('failover', 'scality-s3', (err, res) => {
if (err) {
return done(err);
}
assert.strictEqual(res, 'OK');
// Requests made with async.times will stay open when they occur
// during the failover window so and async.race is used to resolve
async.race([
() => setTimeout(() => this.loadgen.emit('finished'), 3000),
() => async.times(
100,
() => makeRequest(this, done),
() => this.loadgen.emit('finished'),
),
]);
});
sentinelSub.on('message', (chan, message) => {
// wait until the old master is added as a replica so any stale connections would be transfered
assert.strictEqual(chan, '+slave');
// checks that ports differ between old and new master
const data = message.split(' ');
const [oldPort, newPort] = [data[3], data[7]];
assert.notStrictEqual(oldPort, newPort);
return this.loadgen.on('finished', () => {
assert(this.requestsDuringFailover > 1);
return done();
});
});
});
});

View File

@ -77,9 +77,6 @@ const actions = [
'getObjectRetention', 'getObjectRetention',
'putObjectLegalHold', 'putObjectLegalHold',
'getObjectLegalHold', 'getObjectLegalHold',
'replicateObject',
'replicateDelete',
'replicateTags',
]; ];
// Get the proper params object for a pushMetric call for the given action. // Get the proper params object for a pushMetric call for the given action.
@ -97,7 +94,6 @@ function getParams(action) {
case 'uploadPart': case 'uploadPart':
case 'uploadPartCopy': case 'uploadPartCopy':
case 'putObject': case 'putObject':
case 'replicateObject':
case 'copyObject': case 'copyObject':
return Object.assign(resources, { return Object.assign(resources, {
newByteLength: objSize, newByteLength: objSize,
@ -209,22 +205,21 @@ function popAllMetrics(cb) {
// deleteObject metric // deleteObject metric
function checkAllMetrics(cb) { function checkAllMetrics(cb) {
const keys = getAllResourceTypeKeys(); const keys = getAllResourceTypeKeys();
return async.each(keys, (key, next) => return async.each(keys, (key, next) => datastore.get(key, (err, res) => {
datastore.get(key, (err, res) => {
if (err) { if (err) {
return next(err); return next(err);
} }
let expected = 1; // Actions should have been incremented once. let expected = 1; // Actions should have been incremented once.
if (key.includes('incomingBytes')) { if (key.includes('incomingBytes')) {
// putObject, uploadPart, uploadPartCopy, and replicateObject. // putObject, uploadPart, and uploadPartCopy.
expected = objSize * 4; expected = objSize * 3;
} else if (key.includes('outgoingBytes')) { } else if (key.includes('outgoingBytes')) {
expected = objSize; // getObject. expected = objSize; // getObject.
} else if (key.includes('storageUtilized')) { } else if (key.includes('storageUtilized')) {
// After all PUT and DELETE operations, should be 2048. // After all PUT and DELETE operations, should be 1024.
expected = 2048; expected = 1024;
} else if (key.includes('numberOfObjects')) { } else if (key.includes('numberOfObjects')) {
expected = 3; // After PUT and DELETE operations, should be 3. expected = 1; // After PUT and DELETE operations, should be 1.
} else if (key.endsWith('DeleteObject')) { } else if (key.endsWith('DeleteObject')) {
expected = 2; // After DELETE operations, should be 2. expected = 2; // After DELETE operations, should be 2.
} }

View File

@ -1,14 +1,11 @@
const assert = require('assert'); const assert = require('assert');
const fs = require('fs');
const sinon = require('sinon'); const sinon = require('sinon');
const uuid = require('uuid'); const uuid = require('uuid');
const promClient = require('prom-client');
const { clients: warp10Clients } = require('../../../libV2/warp10'); const { clients: warp10Clients } = require('../../../libV2/warp10');
const { MonitorDiskUsage } = require('../../../libV2/tasks'); const { MonitorDiskUsage } = require('../../../libV2/tasks');
const { fillDir } = require('../../utils/v2Data'); const { fillDir } = require('../../utils/v2Data');
const { assertMetricValue } = require('../../utils/prom');
// eslint-disable-next-line func-names // eslint-disable-next-line func-names
describe('Test MonitorDiskUsage hard limit', function () { describe('Test MonitorDiskUsage hard limit', function () {
@ -18,18 +15,14 @@ describe('Test MonitorDiskUsage hard limit', function () {
beforeEach(async () => { beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`; path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(`${path}/datalog`, { recursive: true }); task = new MonitorDiskUsage({ warp10: warp10Clients });
promClient.register.clear();
task = new MonitorDiskUsage({ warp10: warp10Clients, enableMetrics: true });
await task.setup(); await task.setup();
task._path = path; task._path = path;
task._enabled = true; task._enabled = true;
}); });
afterEach(async () => task.join());
it('should trigger a database lock if above the limit', async () => { it('should trigger a database lock if above the limit', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 }); fillDir(path, { count: 1, size: 100 });
task._hardLimit = 1; task._hardLimit = 1;
const checkSpy = sinon.spy(task, '_checkHardLimit'); const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates'); const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -41,11 +34,10 @@ describe('Test MonitorDiskUsage hard limit', function () {
assert(lockSpy.calledOnce); assert(lockSpy.calledOnce);
assert(unlockSpy.notCalled); assert(unlockSpy.notCalled);
assert(execStub.calledOnce); assert(execStub.calledOnce);
await assertMetricValue('s3_utapi_monitor_disk_usage_hard_limit_bytes', 1);
}); });
it('should trigger a database unlock if below the limit', async () => { it('should trigger a database unlock if below the limit', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 }); fillDir(path, { count: 1, size: 100 });
task._hardLimit = 10240; task._hardLimit = 10240;
const checkSpy = sinon.spy(task, '_checkHardLimit'); const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates'); const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -58,7 +50,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
}); });
it('should not throw when failing to calculate disk usage', async () => { it('should not throw when failing to calculate disk usage', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 }); fillDir(path, { count: 1, size: 100 });
task._hardLimit = 1; task._hardLimit = 1;
sinon.stub(task, '_getUsage').throws(); sinon.stub(task, '_getUsage').throws();
const _task = task.execute(); const _task = task.execute();

View File

@ -1,5 +1,5 @@
const assert = require('assert'); const assert = require('assert');
const { makeUtapiGenericClientRequest } = require('../../utils/utils'); const { makeUtapiClientRequest } = require('../../utils/utils');
const Vault = require('../../utils/mock/Vault'); const Vault = require('../../utils/mock/Vault');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month. const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
@ -18,100 +18,34 @@ describe('Request ranges', function test() {
const tests = [ const tests = [
{ {
action: 'ListMetrics', start: 0,
type: 'accounts', end: ((MAX_RANGE_MS / 60) - (1000 * 60) * 15) - 1,
resource: '1234567890',
timeRange: [
0,
((MAX_RANGE_MS / 60) - (1000 * 60) * 15) - 1,
],
expected: {
accountId: '1234567890',
},
}, },
{ {
action: 'ListMetrics', start: 0,
type: 'buckets', end: MAX_RANGE_MS - 1,
resource: 'my-bucket',
timeRange: [
0,
((MAX_RANGE_MS / 60) - (1000 * 60) * 15) - 1,
],
expected: {
bucketName: 'my-bucket',
},
}, },
{ {
action: 'ListMetrics', start: 0,
type: 'buckets', end: (MAX_RANGE_MS + (1000 * 60) * 15) - 1,
resource: 'my-bucket',
timeRange: [
0,
MAX_RANGE_MS - 1,
],
expected: {
bucketName: 'my-bucket',
},
}, },
{ {
action: 'ListMetrics', start: 0,
type: 'buckets', end: (MAX_RANGE_MS * 12) - 1,
resource: 'my-bucket',
timeRange: [
0,
(MAX_RANGE_MS + (1000 * 60) * 15) - 1,
],
expected: {
bucketName: 'my-bucket',
},
},
{
action: 'ListMetrics',
type: 'buckets',
resource: 'my-bucket',
timeRange: [
0,
(MAX_RANGE_MS * 12) - 1,
],
expected: {
bucketName: 'my-bucket',
},
},
{
action: 'ListRecentMetrics',
type: 'buckets',
resource: 'my-bucket',
expected: {
bucketName: 'my-bucket',
},
},
{
action: 'ListRecentMetrics',
type: 'accounts',
resource: '1234567890',
expected: {
accountId: '1234567890',
},
}, },
]; ];
tests.forEach(test => { tests.forEach(test => {
const { const { start, end } = test;
timeRange, type, resource, expected, action, it(`should handle a request range of ${end - start}ms`, done => {
} = test; const params = {
const msg = timeRange timeRange: [start, end],
? `should handle a request range of ${timeRange[1] - timeRange[0]}ms` resource: {
: 'should handle a ListRecentMetrics request'; type: 'buckets',
it(msg, done => { buckets: ['my-bucket'],
const headers = { },
method: 'POST',
path: `/${type}?Action=${action}`,
}; };
const body = { makeUtapiClientRequest(params, (err, response) => {
timeRange,
[type]: [resource],
};
makeUtapiGenericClientRequest(headers, body, (err, response) => {
if (err) { if (err) {
return done(err); return done(err);
} }
@ -119,14 +53,8 @@ describe('Request ranges', function test() {
if (data.code) { if (data.code) {
return done(new Error(data.message)); return done(new Error(data.message));
} }
const { timeRange } = data[0];
if (timeRange) { assert.deepStrictEqual(timeRange, [start, end]);
assert.deepStrictEqual(timeRange, data[0].timeRange);
}
Object.entries(expected).forEach(([k, v]) => {
assert.strictEqual(data[0][k], v);
});
return done(); return done();
}); });
}); });

View File

@ -9,8 +9,7 @@ const { convertTimestamp, now } = require('../../../../libV2/utils');
const { operationToResponse } = require('../../../../libV2/constants'); const { operationToResponse } = require('../../../../libV2/constants');
const { generateCustomEvents } = require('../../../utils/v2Data'); const { generateCustomEvents } = require('../../../utils/v2Data');
const { BucketD } = require('../../../utils/mock/'); const { UtapiMetric } = require('../../../../libV2/models');
const vaultclient = require('../../../utils/vaultclient');
const warp10 = warp10Clients[0]; const warp10 = warp10Clients[0];
const _now = Math.floor(new Date().getTime() / 1000); const _now = Math.floor(new Date().getTime() / 1000);
@ -29,7 +28,7 @@ const emptyOperationsResponse = Object.values(operationToResponse)
return prev; return prev;
}, {}); }, {});
async function listMetrics(level, resources, start, end, credentials) { async function listMetrics(level, resources, start, end, force403 = false) {
const body = { const body = {
[level]: resources, [level]: resources,
}; };
@ -51,13 +50,12 @@ async function listMetrics(level, resources, start, end, credentials) {
}, },
}; };
const { accessKey: accessKeyId, secretKey: secretAccessKey } = credentials; const credentials = {
const _credentials = { accessKeyId: force403 ? 'invalidKey' : 'accessKey1',
accessKeyId, secretAccessKey: 'verySecretKey1',
secretAccessKey,
}; };
const sig = aws4.sign(headers, _credentials); const sig = aws4.sign(headers, credentials);
return needle( return needle(
'post', 'post',
@ -83,199 +81,140 @@ function opsToResp(operations) {
}, { ...emptyOperationsResponse }); }, { ...emptyOperationsResponse });
} }
function assertMetricResponse(provided, expected) { const testCases = [
assert.deepStrictEqual(provided.operations, opsToResp(expected.ops)); {
assert.strictEqual(provided.incomingBytes, expected.in); desc: 'for a single resource',
assert.strictEqual(provided.outgoingBytes, expected.out); args: { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
assert.deepStrictEqual(provided.storageUtilized, [0, expected.bytes]); },
assert.deepStrictEqual(provided.numberOfObjects, [0, expected.count]); {
} desc: 'for multiple resources',
args: {
[uuid.v4()]: {
[uuid.v4()]: [uuid.v4(), uuid.v4(), uuid.v4()],
[uuid.v4()]: [uuid.v4(), uuid.v4(), uuid.v4()],
},
[uuid.v4()]: {
[uuid.v4()]: [uuid.v4(), uuid.v4(), uuid.v4()],
[uuid.v4()]: [uuid.v4(), uuid.v4(), uuid.v4()],
},
[uuid.v4()]: {
[uuid.v4()]: [uuid.v4(), uuid.v4(), uuid.v4()],
[uuid.v4()]: [uuid.v4(), uuid.v4(), uuid.v4()],
},
},
},
];
describe('Test listMetric', function () { describe('Test listMetric', function () {
this.timeout(10000); this.timeout(10000);
const bucketd = new BucketD(true); testCases.forEach(testCase => {
describe(testCase.desc, () => {
let account;
let user;
let otherAccount;
let otherUser;
let serviceAccount;
let serviceUser;
const bucket = uuid.v4();
const otherBucket = uuid.v4();
let totals; let totals;
before(async () => { before(async () => {
account = await vaultclient.createAccountAndKeys(uuid.v4());
user = await vaultclient.createUserAndKeys(account, uuid.v4());
otherAccount = await vaultclient.createAccountAndKeys(uuid.v4());
otherUser = await vaultclient.createUser(otherAccount, uuid.v4());
serviceAccount = await vaultclient.createInternalServiceAccountAndKeys();
serviceUser = await vaultclient.ensureServiceUser(serviceAccount);
await Promise.all([
vaultclient.createAndAttachUtapiPolicy(account, user, 'buckets', '*'),
vaultclient.createAndAttachUtapiPolicy(account, user, 'accounts', '*'),
vaultclient.createAndAttachUtapiPolicy(account, user, 'users', '*'),
]);
bucketd.createBucketsWithOwner([
{ name: bucket, owner: account.canonicalId },
{ name: otherBucket, owner: otherAccount.canonicalId },
]);
bucketd.start();
const { events, totals: _totals } = generateCustomEvents( const { events, totals: _totals } = generateCustomEvents(
getTs(-360), getTs(-360),
getTs(-60), getTs(-60),
1000, 1000,
{ [account.canonicalId]: { [user.id]: [bucket] } }, testCase.args,
); );
totals = _totals; totals = _totals;
assert(await ingestEvents(events)); assert(await ingestEvents(events));
}); });
after(async () => { after(async () => {
bucketd.end();
await warp10.delete({ await warp10.delete({
className: '~.*', className: '~.*',
start: 0, start: 0,
end: now(), end: now(),
}); });
await vaultclient.cleanupAccountAndUsers(account);
await vaultclient.cleanupAccountAndUsers(otherAccount);
await vaultclient.cleanupAccountAndUsers(serviceAccount);
}); });
describe('test account credentials', () => { const accounts = [];
it('should list metrics for the same account', async () => { const users = [];
const resp = await listMetrics('accounts', [account.id], getTs(-500), getTs(0), account); const buckets = [];
Object.entries(testCase.args)
.forEach(([account, _users]) => {
accounts.push(`account:${account}`);
Object.entries(_users).forEach(([user, _buckets]) => {
users.push(user);
buckets.push(..._buckets);
});
});
const metricQueries = {
accounts,
users,
buckets,
};
Object.entries(metricQueries)
.forEach(query => {
const [level, resources] = query;
it(`should get metrics for ${level}`, async () => {
const resp = await listMetrics(...query, getTs(-500), getTs(0));
assert(Array.isArray(resp.body)); assert(Array.isArray(resp.body));
const { body } = resp; const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.accounts]), [account.id]); assert.deepStrictEqual(body.map(r => r[metricResponseKeys[level]]), resources);
body.forEach(metric => { body.forEach(metric => {
assertMetricResponse(metric, totals.accounts[account.canonicalId]); const key = metric[metricResponseKeys[level]];
const _key = level === 'accounts' ? key.split(':')[1] : key;
const expected = totals[level][_key];
assert.deepStrictEqual(metric.operations, opsToResp(expected.ops));
assert.strictEqual(metric.incomingBytes, expected.in);
assert.strictEqual(metric.outgoingBytes, expected.out);
assert.deepStrictEqual(metric.storageUtilized, [0, expected.bytes]);
assert.deepStrictEqual(metric.numberOfObjects, [0, expected.count]);
});
});
});
}); });
}); });
it("should list metrics for an account's user", async () => { it('should return 0 in metrics are negative', async () => {
const resp = await listMetrics('users', [user.id], getTs(-500), getTs(0), account); const bucket = `imabucket-${uuid.v4()}`;
assert(Array.isArray(resp.body)); const account = `imaaccount-${uuid.v4()}`;
const { body } = resp; const event = new UtapiMetric({
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.users]), [user.id]); timestamp: getTs(0),
body.forEach(metric => { bucket,
assertMetricResponse(metric, totals.users[user.id]); account,
}); objectDelta: -1,
sizeDelta: -1,
incomingBytes: -1,
outgoingBytes: -1,
operationId: 'putObject',
}); });
it("should list metrics for an account's bucket", async () => { await ingestEvents([event]);
const resp = await listMetrics('buckets', [bucket], getTs(-500), getTs(0), account);
assert(Array.isArray(resp.body)); const bucketResp = await listMetrics('buckets', [bucket], getTs(-1), getTs(1));
const { body } = resp; assert(Array.isArray(bucketResp.body));
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.buckets]), [bucket]);
body.forEach(metric => { const [bucketMetric] = bucketResp.body;
assertMetricResponse(metric, totals.buckets[bucket]); assert.deepStrictEqual(bucketMetric.storageUtilized, [0, 0]);
}); assert.deepStrictEqual(bucketMetric.numberOfObjects, [0, 0]);
assert.deepStrictEqual(bucketMetric.incomingBytes, 0);
assert.deepStrictEqual(bucketMetric.outgoingBytes, 0);
const accountResp = await listMetrics('accounts', [account], getTs(-1), getTs(1));
assert(Array.isArray(accountResp.body));
const [accountMetric] = accountResp.body;
assert.deepStrictEqual(accountMetric.storageUtilized, [0, 0]);
assert.deepStrictEqual(accountMetric.numberOfObjects, [0, 0]);
assert.deepStrictEqual(accountMetric.incomingBytes, 0);
assert.deepStrictEqual(accountMetric.outgoingBytes, 0);
}); });
it('should return Access Denied for a different account', async () => { it('should return a 403 if unauthorized', async () => {
const resp = await listMetrics('accounts', [otherAccount.id], getTs(-500), getTs(0), account); const resp = await listMetrics('buckets', ['test'], getTs(-1), getTs(1), true);
assert.strictEqual(resp.statusCode, 403); assert.strictEqual(resp.statusCode, 403);
assert.deepStrictEqual(resp.body, { code: 'AccessDenied', message: 'Access Denied' });
}); });
it("should return Access Denied for a different account's user", async () => { it('should use the current timestamp for "end" if it is not provided', async () => {
const resp = await listMetrics('users', [otherUser.id], getTs(-500), getTs(0), account); const resp = await listMetrics('buckets', ['test'], getTs(-1));
assert.strictEqual(resp.statusCode, 403); assert.strictEqual(resp.body[0].timeRange.length, 2);
assert.deepStrictEqual(resp.body, { code: 'AccessDenied', message: 'Access Denied' });
});
it("should return Access Denied for a different account's bucket", async () => {
const resp = await listMetrics('buckets', [otherBucket], getTs(-500), getTs(0), account);
assert.strictEqual(resp.statusCode, 403);
assert.deepStrictEqual(resp.body, { code: 'AccessDenied', message: 'Access Denied' });
});
});
describe('test user credentials', () => {
it('should list metrics for the same account', async () => {
const resp = await listMetrics('accounts', [account.id], getTs(-500), getTs(0), user);
assert(Array.isArray(resp.body));
const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.accounts]), [account.id]);
body.forEach(metric => {
assertMetricResponse(metric, totals.accounts[account.canonicalId]);
});
});
it("should list metrics for a user's account", async () => {
const resp = await listMetrics('users', [user.id], getTs(-500), getTs(0), user);
assert(Array.isArray(resp.body));
const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.users]), [user.id]);
body.forEach(metric => {
assertMetricResponse(metric, totals.users[user.id]);
});
});
it("should list metrics for an user's bucket", async () => {
const resp = await listMetrics('buckets', [bucket], getTs(-500), getTs(0), user);
assert(Array.isArray(resp.body));
const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.buckets]), [bucket]);
body.forEach(metric => {
assertMetricResponse(metric, totals.buckets[bucket]);
});
});
it("should return Access Denied for a different user's account", async () => {
const resp = await listMetrics('accounts', [otherAccount.id], getTs(-500), getTs(0), user);
assert.strictEqual(resp.statusCode, 403);
assert.deepStrictEqual(resp.body, { code: 'AccessDenied', message: 'Access Denied' });
});
it("should return Access Denied for a different account's user", async () => {
const resp = await listMetrics('users', [otherUser.id], getTs(-500), getTs(0), user);
assert.strictEqual(resp.statusCode, 403);
assert.deepStrictEqual(resp.body, { code: 'AccessDenied', message: 'Access Denied' });
});
it("should return Access Denied for a different account's bucket", async () => {
const resp = await listMetrics('buckets', [otherBucket], getTs(-500), getTs(0), user);
assert.strictEqual(resp.statusCode, 403);
assert.deepStrictEqual(resp.body, { code: 'AccessDenied', message: 'Access Denied' });
});
});
describe('test service user credentials', () => {
it('should list metrics for an account', async () => {
const resp = await listMetrics('accounts', [account.id], getTs(-500), getTs(0), serviceUser);
assert(Array.isArray(resp.body));
const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.accounts]), [account.id]);
body.forEach(metric => {
assertMetricResponse(metric, totals.accounts[account.canonicalId]);
});
});
it('should list metrics for a user', async () => {
const resp = await listMetrics('users', [user.id], getTs(-500), getTs(0), serviceUser);
assert(Array.isArray(resp.body));
const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.users]), [user.id]);
body.forEach(metric => {
assertMetricResponse(metric, totals.users[user.id]);
});
});
it('should list metrics for a bucket', async () => {
const resp = await listMetrics('buckets', [bucket], getTs(-500), getTs(0), serviceUser);
assert(Array.isArray(resp.body));
const { body } = resp;
assert.deepStrictEqual(body.map(r => r[metricResponseKeys.buckets]), [bucket]);
body.forEach(metric => {
assertMetricResponse(metric, totals.buckets[bucket]);
});
});
}); });
}); });

View File

@ -1,23 +0,0 @@
const assert = require('assert');
const needle = require('needle');
const testMetrics = async pair => {
const [name, url] = pair;
it(`should return metrics for ${name} from url ${url}`, async () => {
const res = await needle('get', url);
const lines = res.body.split('\n');
const first = lines[0];
assert.strictEqual(res.statusCode, 200);
assert(first.startsWith('# HELP'));
});
};
describe('Test Prometheus Metrics', () => {
const nameUrlPairs = [
['utapi nodejs service exporter', 'http://localhost:8100/_/metrics'],
['sensision exporter', 'http://localhost:9718/metrics'],
['redis exporter', 'http://localhost:9121/metrics'],
];
nameUrlPairs.forEach(pair => testMetrics(pair));
});

View File

@ -1,83 +0,0 @@
const assert = require('assert');
const needle = require('needle');
const promClient = require('prom-client');
const sinon = require('sinon');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { BaseTask } = require('../../../../libV2/tasks');
const { clients: warp10Clients } = require('../../../../libV2/warp10');
const { getMetricValues } = require('../../../utils/prom');
const METRICS_SERVER_PORT = 10999;
class CustomTask extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const foo = new promClient.Gauge({
name: 's3_utapi_custom_task_foo_total',
help: 'Count of foos',
labelNames: ['origin', 'containerName'],
});
return { foo };
}
async _execute() {
this._metricsHandlers.foo.inc(1);
}
}
describe('Test BaseTask metrics', () => {
let task;
beforeEach(async () => {
task = new CustomTask({
enableMetrics: true,
metricsPort: METRICS_SERVER_PORT,
warp10: [warp10Clients[0]],
});
await task.setup();
});
afterEach(async () => {
await task.join();
promClient.register.clear();
});
it('should start a metrics server on the provided port', async () => {
const res = await needle(
'get',
`http://localhost:${METRICS_SERVER_PORT}${DEFAULT_METRICS_ROUTE}`,
);
const lines = res.body.split('\n');
const first = lines[0];
assert.strictEqual(res.statusCode, 200);
assert(first.startsWith('# HELP'));
});
it('should push metrics for a task execution', async () => {
await task.execute();
const timeValues = await getMetricValues('s3_utapi_custom_task_duration_seconds');
assert.strictEqual(timeValues.length, 1);
const attemptsValues = await getMetricValues('s3_utapi_custom_task_attempts_total');
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, []);
});
it('should push metrics for a failed task execution', async () => {
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
await task.execute();
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
});
it('should allow custom handlers to be registered', async () => {
await task.execute();
const fooValues = await getMetricValues('s3_utapi_custom_task_foo_total');
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
});
});

View File

@ -1,5 +1,4 @@
const assert = require('assert'); const assert = require('assert');
const promClient = require('prom-client');
const uuid = require('uuid'); const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10'); const { Warp10Client } = require('../../../../libV2/warp10');
@ -7,7 +6,6 @@ const { convertTimestamp } = require('../../../../libV2/utils');
const { CreateCheckpoint } = require('../../../../libV2/tasks'); const { CreateCheckpoint } = require('../../../../libV2/tasks');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data'); const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
const _now = Math.floor(new Date().getTime() / 1000); const _now = Math.floor(new Date().getTime() / 1000);
const getTs = delta => convertTimestamp(_now + delta); const getTs = delta => convertTimestamp(_now + delta);
@ -51,16 +49,10 @@ describe('Test CreateCheckpoint', function () {
prefix = uuid.v4(); prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix }); warp10 = new Warp10Client({ nodeId: prefix });
checkpointTask = new CreateCheckpoint({ warp10: [warp10], enableMetrics: true }); checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
await checkpointTask.setup();
checkpointTask._program = { lag: 0, nodeId: prefix }; checkpointTask._program = { lag: 0, nodeId: prefix };
}); });
afterEach(async () => {
await checkpointTask.join();
promClient.register.clear();
});
it('should create checkpoints from events', async () => { it('should create checkpoints from events', async () => {
const start = getTs(-300); const start = getTs(-300);
const stop = getTs(-120); const stop = getTs(-120);
@ -80,7 +72,6 @@ describe('Test CreateCheckpoint', function () {
assert.strictEqual(series.length, 3); assert.strictEqual(series.length, 3);
assertResults(totals, series); assertResults(totals, series);
await assertMetricValue('s3_utapi_create_checkpoint_created_total', series.length);
}); });
it('should only include events not in an existing checkpoint', async () => { it('should only include events not in an existing checkpoint', async () => {

View File

@ -1,5 +1,4 @@
const assert = require('assert'); const assert = require('assert');
const promClient = require('prom-client');
const uuid = require('uuid'); const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10'); const { Warp10Client } = require('../../../../libV2/warp10');
@ -7,7 +6,6 @@ const { convertTimestamp } = require('../../../../libV2/utils');
const { CreateCheckpoint, CreateSnapshot, RepairTask } = require('../../../../libV2/tasks'); const { CreateCheckpoint, CreateSnapshot, RepairTask } = require('../../../../libV2/tasks');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data'); const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
const _now = Math.floor(new Date().getTime() / 1000); const _now = Math.floor(new Date().getTime() / 1000);
const getTs = delta => convertTimestamp(_now + delta); const getTs = delta => convertTimestamp(_now + delta);
@ -56,19 +54,13 @@ describe('Test CreateSnapshot', function () {
checkpointTask = new CreateCheckpoint({ warp10: [warp10] }); checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
checkpointTask._program = { lag: 0, nodeId: prefix }; checkpointTask._program = { lag: 0, nodeId: prefix };
snapshotTask = new CreateSnapshot({ warp10: [warp10], enableMetrics: true }); snapshotTask = new CreateSnapshot({ warp10: [warp10] });
await snapshotTask.setup();
snapshotTask._program = { lag: 0, nodeId: prefix }; snapshotTask._program = { lag: 0, nodeId: prefix };
repairTask = new RepairTask({ warp10: [warp10] }); repairTask = new RepairTask({ warp10: [warp10] });
repairTask._program = { lag: 0, nodeId: prefix }; repairTask._program = { lag: 0, nodeId: prefix };
}); });
afterEach(async () => {
await snapshotTask.join();
promClient.register.clear();
});
it('should create a snapshot from a checkpoint', async () => { it('should create a snapshot from a checkpoint', async () => {
const start = getTs(-300); const start = getTs(-300);
const stop = getTs(-120); const stop = getTs(-120);
@ -88,7 +80,6 @@ describe('Test CreateSnapshot', function () {
assert.strictEqual(series.length, 3); assert.strictEqual(series.length, 3);
assertResults(totals, series); assertResults(totals, series);
await assertMetricValue('s3_utapi_create_snapshot_created_total', series.length);
}); });
it('should create a snapshot from more than one checkpoint', async () => { it('should create a snapshot from more than one checkpoint', async () => {

Some files were not shown because too many files have changed in this diff Show More