Compare commits

..

2 Commits

Author SHA1 Message Date
Taylor McKinnon 954a0cc5a3 fix CacheClient.pushMetric to `await` this._cacheBackend.addToShard 2022-05-06 11:25:11 -07:00
Taylor McKinnon ba07ad48ac wait for ready before returning RedisCache.connect() 2022-05-06 11:24:14 -07:00
76 changed files with 5268 additions and 2267 deletions

View File

@ -1,87 +0,0 @@
# General support information
GitHub Issues are **reserved** for actionable bug reports (including
documentation inaccuracies), and feature requests.
**All questions** (regarding configuration, usecases, performance, community,
events, setup and usage recommendations, among other things) should be asked on
the **[Zenko Forum](http://forum.zenko.io/)**.
> Questions opened as GitHub issues will systematically be closed, and moved to
> the [Zenko Forum](http://forum.zenko.io/).
--------------------------------------------------------------------------------
## Avoiding duplicates
When reporting a new issue/requesting a feature, make sure that we do not have
any duplicates already open:
- search the issue list for this repository (use the search bar, select
"Issues" on the left pane after searching);
- if there is a duplicate, please do not open your issue, and add a comment
to the existing issue instead.
--------------------------------------------------------------------------------
## Bug report information
(delete this section (everything between the lines) if you're not reporting a
bug but requesting a feature)
### Description
Briefly describe the problem you are having in a few paragraphs.
### Steps to reproduce the issue
Please provide steps to reproduce, including full log output
### Actual result
Describe the results you received
### Expected result
Describe the results you expected
### Additional information
- Node.js version,
- Docker version,
- npm version,
- distribution/OS,
- optional: anything else you deem helpful to us.
--------------------------------------------------------------------------------
## Feature Request
(delete this section (everything between the lines) if you're not requesting
a feature but reporting a bug)
### Proposal
Describe the feature
### Current behavior
What currently happens
### Desired behavior
What you would like to happen
### Usecase
Please provide usecases for changing the current behavior
### Additional information
- Is this request for your company? Y/N
- If Y: Company name:
- Are you using any Scality Enterprise Edition products (RING, Zenko EE)? Y/N
- Are you willing to contribute this feature yourself?
- Position/Title:
- How did you hear about us?
--------------------------------------------------------------------------------

View File

@ -1,7 +1,8 @@
FROM ghcr.io/scality/vault:c2607856
FROM registry.scality.com/vault-dev/vault:c2607856
ENV VAULT_DB_BACKEND LEVELDB
RUN chmod 400 tests/utils/keyfile
ENTRYPOINT yarn start

View File

@ -1,65 +0,0 @@
name: build-ci-images
on:
workflow_call:
jobs:
warp10-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: warp10-ci
context: .
file: images/warp10/Dockerfile
lfs: true
redis-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-ci
context: .
file: images/redis/Dockerfile
redis-replica-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
needs:
- redis-ci
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-replica-ci
context: .github/docker/redis-replica
build-args: |
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
vault-ci:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to GitHub Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ github.token }}
- name: Build and push vault Image
uses: docker/build-push-action@v5
with:
push: true
context: .github/docker/vault
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
cache-from: type=gha,scope=vault
cache-to: type=gha,mode=max,scope=vault

View File

@ -1,16 +0,0 @@
name: build-dev-image
on:
push:
branches-ignore:
- 'development/**'
jobs:
build-dev:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}

View File

@ -1,39 +0,0 @@
name: release-warp10
on:
workflow_dispatch:
inputs:
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: true
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets: inherit
with:
name: warp10
context: .
file: images/warp10/Dockerfile
tag: ${{ github.event.inputs.tag }}
lfs: true
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release utapi/warp10:${{ github.event.inputs.tag }}-warp10
tag_name: ${{ github.event.inputs.tag }}-warp10
generate_release_notes: false
target_commitish: ${{ github.sha }}

View File

@ -3,43 +3,44 @@ name: release
on:
workflow_dispatch:
inputs:
dockerfile:
description: Dockerfile to build image from
type: choice
options:
- images/nodesvc-base/Dockerfile
- Dockerfile
required: true
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: false
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}
context: .
file: ${{ github.event.inputs.dockerfile}}
tag: ${{ github.event.inputs.tag }}
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
- name: Checkout
uses: actions/checkout@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v1
- name: Set up Docker Buildk
uses: docker/setup-buildx-action@v1
- name: Login to Registry
uses: docker/login-action@v1
with:
registry: registry.scality.com
username: ${{ secrets.REGISTRY_LOGIN }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build and push utapi image
uses: docker/build-push-action@v2
with:
context: .
push: true
tags: "registry.scality.com/utapi/utapi:${{ github.event.inputs.tag }}"
- name: Create Release
uses: softprops/action-gh-release@v1
env:
GITHUB_TOKEN: ${{ github.token }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release ${{ github.event.inputs.tag }}
tag_name: ${{ github.event.inputs.tag }}
generate_release_notes: true
target_commitish: ${{ github.sha }}

View File

@ -6,43 +6,93 @@ on:
branches-ignore:
- 'development/**'
workflow_dispatch:
inputs:
debug:
description: Debug (enable the ability to SSH to runners)
type: boolean
required: false
default: 'false'
connection-timeout-m:
type: number
required: false
description: Timeout for ssh connection to worker (minutes)
default: 30
jobs:
build-ci:
uses: ./.github/workflows/build-ci.yaml
build:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v2.3.4
with:
lfs: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v1.6.0
- name: Login to GitHub Registry
uses: docker/login-action@v1.10.0
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Login to Scality Registry
uses: docker/login-action@v1.10.0
with:
registry: registry.scality.com
username: ${{ secrets.REGISTRY_LOGIN }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build and push redis CI image
uses: docker/build-push-action@v2.7.0
with:
push: true
file: images/redis/Dockerfile
context: '.'
tags: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
cache-from: type=gha,scope=redis
cache-to: type=gha,mode=max,scope=redis
- name: Build and push redis replica CI image
uses: docker/build-push-action@v2.7.0
with:
push: true
context: .github/docker/redis-replica
build-args: |
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
tags: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
cache-from: type=gha,scope=redis-replica
cache-to: type=gha,mode=max,scope=redis-replica
- name: Build and push warp10 Image
uses: docker/build-push-action@v2.7.0
with:
push: true
file: images/warp10/Dockerfile
context: '.'
tags: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
cache-from: type=gha,scope=warp10
cache-to: type=gha,mode=max,scope=warp10
- name: Build and push vault Image
uses: docker/build-push-action@v2.7.0
with:
push: true
context: '.github/docker/vault'
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
cache-from: type=gha,scope=vault
cache-to: type=gha,mode=max,scope=vault
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
run: yarn install --frozen-lockfile
- name: run static analysis tools on markdown
run: yarn run lint_md
- name: run static analysis tools on code
run: yarn run lint
tests-v1:
needs:
- build-ci
tests:
needs: build
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
@ -53,18 +103,17 @@ jobs:
test:
- name: run unit tests
command: yarn test
env:
UTAPI_METRICS_ENABLED: 'true'
- name: run v1 client tests
env: {}
- name: run client tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client
env: {}
- name: run v1 server tests
- name: run server tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:server
env: {}
- name: run v1 cron tests
- name: run cron tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:cron
env: {}
- name: run v1 interval tests
- name: run interval tests
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:interval
env: {}
services:
@ -88,7 +137,7 @@ jobs:
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
image: bitnami/redis-sentinel:6.2
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
@ -119,133 +168,32 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
- uses: actions/setup-python@v2
with:
python-version: '3.9'
cache: pip
- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip
- name: Install python deps
run: pip install -r requirements.txt
run: |
pip install requests
pip install redis
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
run: yarn install --frozen-lockfile
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
tests-v2-with-vault:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 for 60 seconds
run: sleep 60
- name: run v2 functional tests
run: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_SERVICE_USER_ENABLED: 'true'
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
tests-v2-without-sensision:
needs:
- build-ci
tests-with-vault:
needs: build
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
@ -254,6 +202,13 @@ jobs:
fail-fast: false
matrix:
test:
- name: run v2 functional tests
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_SERVICE_USER_ENABLED: 'true'
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: run v2 soft limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:softLimit
env:
@ -287,7 +242,7 @@ jobs:
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
image: bitnami/redis-sentinel:6.2
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
@ -304,6 +259,7 @@ jobs:
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
ports:
- 4802:4802
- 8082:8082
@ -329,33 +285,31 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v2
with:
lfs: true
- uses: actions/setup-node@v4
- uses: actions/setup-node@v2
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
- uses: actions/setup-python@v2
with:
python-version: '3.9'
cache: pip
- uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip
- name: Install python deps
run: pip install -r requirements.txt
run: |
pip install requests
pip install redis
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
run: yarn install --frozen-lockfile
- name: Wait for warp10 a little bit
run: sleep 60
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
if: failure()

View File

@ -1,31 +0,0 @@
FROM node:16.13.2-buster-slim
WORKDIR /usr/src/app
COPY package.json yarn.lock /usr/src/app/
RUN apt-get update \
&& apt-get install -y \
curl \
gnupg2
RUN curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list
RUN apt-get update \
&& apt-get install -y jq git python3 build-essential yarn --no-install-recommends \
&& yarn cache clean \
&& yarn install --frozen-lockfile --production --ignore-optional --network-concurrency=1 \
&& apt-get autoremove --purge -y python3 git build-essential \
&& rm -rf /var/lib/apt/lists/* \
&& yarn cache clean \
&& rm -rf ~/.node-gyp \
&& rm -rf /tmp/yarn-*
# Keep the .git directory in order to properly report version
COPY . /usr/src/app
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
CMD [ "yarn", "start" ]
EXPOSE 8100

View File

@ -3,8 +3,9 @@
![Utapi logo](res/utapi-logo.png)
[![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi)
[![Scality CI][badgepriv]](http://ci.ironmann.io/gh/scality/utapi)
Service Utilization API for tracking resource usage and metrics reporting.
Service Utilization API for tracking resource usage and metrics reporting
## Design
@ -87,13 +88,13 @@ Server is running.
1. Create an IAM user
```
aws iam --endpoint-url <endpoint> create-user --user-name <user-name>
aws iam --endpoint-url <endpoint> create-user --user-name utapiuser
```
2. Create access key for the user
```
aws iam --endpoint-url <endpoint> create-access-key --user-name <user-name>
aws iam --endpoint-url <endpoint> create-access-key --user-name utapiuser
```
3. Define a managed IAM policy
@ -202,11 +203,12 @@ Server is running.
5. Attach user to the managed policy
```
aws --endpoint-url <endpoint> iam attach-user-policy --user-name
<user-name> --policy-arn <policy arn>
aws --endpoint-url <endpoint> iam attach-user-policy --user-name utapiuser
--policy-arn <policy arn>
```
Now the user has access to ListMetrics request in Utapi on all buckets.
Now the user `utapiuser` has access to ListMetrics request in Utapi on all
buckets.
### Signing request with Auth V4
@ -222,18 +224,16 @@ following urls for reference.
You may also view examples making a request with Auth V4 using various languages
and AWS SDKs [here](/examples).
Alternatively, you can use a nifty command line tool available in Scality's
CloudServer.
Alternatively, you can use a nifty command line tool available in Scality's S3.
You can git clone the CloudServer repo from here
https://github.com/scality/cloudserver and follow the instructions in the README
to install the dependencies.
You can git clone S3 repo from here https://github.com/scality/S3.git and follow
the instructions in README to install the dependencies.
If you have CloudServer running inside a docker container you can docker exec
into the CloudServer container as
If you have S3 running inside a docker container you can docker exec into the S3
container as
```
docker exec -it <container-id> bash
docker exec -it <container id> bash
```
and then run the command
@ -271,7 +271,7 @@ Usage: list_metrics [options]
-v, --verbose
```
An example call to list metrics for a bucket `demo` to Utapi in a https enabled
A typical call to list metrics for a bucket `demo` to Utapi in a https enabled
deployment would be
```
@ -283,7 +283,7 @@ Both start and end times are time expressed as UNIX epoch timestamps **expressed
in milliseconds**.
Keep in mind, since Utapi metrics are normalized to the nearest 15 min.
interval, start time and end time need to be in the specific format as follows.
interval, so start time and end time need to be in specific format as follows.
#### Start time
@ -297,7 +297,7 @@ Date: Tue Oct 11 2016 17:35:25 GMT-0700 (PDT)
Unix timestamp (milliseconds): 1476232525320
Here's an example JS method to get a start timestamp
Here's a typical JS method to get start timestamp
```javascript
function getStartTimestamp(t) {
@ -317,7 +317,7 @@ seconds and milliseconds set to 59 and 999 respectively. So valid end timestamps
would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and
`09:59:59:999`.
Here's an example JS method to get an end timestamp
Here's a typical JS method to get end timestamp
```javascript
function getEndTimestamp(t) {
@ -342,3 +342,4 @@ In order to contribute, please follow the
https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md).
[badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg
[badgepriv]: http://ci.ironmann.io/gh/scality/utapi.svg?style=svg

View File

@ -27,7 +27,7 @@ x-models:
services:
redis-0:
image: redis:7.2.4
image: redis:5
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6379:6379
@ -35,7 +35,7 @@ services:
- HOST_IP="${EXTERNAL_HOST}"
redis-1:
image: redis:7.2.4
image: redis:5
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6380:6380
@ -43,7 +43,7 @@ services:
- HOST_IP="${EXTERNAL_HOST}"
redis-sentinel-0:
image: redis:7.2.4
image: redis:5
command: |-
bash -c 'cat > /tmp/sentinel.conf <<EOF
port 16379

View File

@ -1,47 +0,0 @@
#!/bin/bash
# set -e stops the execution of a script if a command or pipeline has an error
set -e
# modifying config.json
JQ_FILTERS_CONFIG="."
if [[ "$LOG_LEVEL" ]]; then
if [[ "$LOG_LEVEL" == "info" || "$LOG_LEVEL" == "debug" || "$LOG_LEVEL" == "trace" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .log.logLevel=\"$LOG_LEVEL\""
echo "Log level has been modified to $LOG_LEVEL"
else
echo "The log level you provided is incorrect (info/debug/trace)"
fi
fi
if [[ "$WORKERS" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .workers=\"$WORKERS\""
fi
if [[ "$REDIS_HOST" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.host=\"$REDIS_HOST\""
fi
if [[ "$REDIS_PORT" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.port=\"$REDIS_PORT\""
fi
if [[ "$VAULTD_HOST" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.host=\"$VAULTD_HOST\""
fi
if [[ "$VAULTD_PORT" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.port=\"$VAULTD_PORT\""
fi
if [[ "$HEALTHCHECKS_ALLOWFROM" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .healthChecks.allowFrom=[\"$HEALTHCHECKS_ALLOWFROM\"]"
fi
if [[ $JQ_FILTERS_CONFIG != "." ]]; then
jq "$JQ_FILTERS_CONFIG" config.json > config.json.tmp
mv config.json.tmp config.json
fi
exec "$@"

View File

@ -1,42 +0,0 @@
# Utapi Release Plan
## Docker Image Generation
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
Utapi has one namespace there:
* Namespace: ghcr.io/scality/utapi
With every CI build, the CI will push images, tagging the
content with the developer branch's short SHA-1 commit hash.
This allows those images to be used by developers, CI builds,
build chain and so on.
Tagged versions of utapi will be stored in the production namespace.
## How to Pull Docker Images
```sh
docker pull ghcr.io/scality/utapi:<commit hash>
docker pull ghcr.io/scality/utapi:<tag>
```
## Release Process
To release a production image:
* Name the tag for the repository and Docker image.
* Use the `yarn version` command with the same tag to update `package.json`.
* Create a PR and merge the `package.json` change.
* Tag the repository using the same tag.
* [Force a build] using:
* A given branch that ideally matches the tag.
* The `release` stage.
* An extra property with the name `tag` and its value being the actual tag.
[Force a build]:
https://eve.devsca.com/github/scality/utapi/#/builders/bootstrap/force/force

View File

@ -1,90 +0,0 @@
import sys, os, base64, datetime, hashlib, hmac, datetime, calendar, json
import requests # pip install requests
access_key = '9EQTVVVCLSSG6QBMNKO5'
secret_key = 'T5mK/skkkwJ/mTjXZnHyZ5UzgGIN=k9nl4dyTmDH'
method = 'POST'
service = 's3'
host = 'localhost:8100'
region = 'us-east-1'
canonical_uri = '/buckets'
canonical_querystring = 'Action=ListMetrics&Version=20160815'
content_type = 'application/x-amz-json-1.0'
algorithm = 'AWS4-HMAC-SHA256'
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d')
# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def get_start_time(t):
start = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
return calendar.timegm(start.utctimetuple()) * 1000;
def get_end_time(t):
end = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
return calendar.timegm(end.utctimetuple()) * 1000 - 1;
start_time = get_start_time(datetime.datetime(2016, 1, 1, 0, 0, 0, 0))
end_time = get_end_time(datetime.datetime(2016, 2, 1, 0, 0, 0, 0))
# Request parameters for listing Utapi bucket metrics--passed in a JSON block.
bucketListing = {
'buckets': [ 'utapi-test' ],
'timeRange': [ start_time, end_time ],
}
request_parameters = json.dumps(bucketListing)
payload_hash = hashlib.sha256(request_parameters).hexdigest()
canonical_headers = \
'content-type:{0}\nhost:{1}\nx-amz-content-sha256:{2}\nx-amz-date:{3}\n' \
.format(content_type, host, payload_hash, amz_date)
signed_headers = 'content-type;host;x-amz-content-sha256;x-amz-date'
canonical_request = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}' \
.format(method, canonical_uri, canonical_querystring, canonical_headers,
signed_headers, payload_hash)
credential_scope = '{0}/{1}/{2}/aws4_request' \
.format(date_stamp, region, service)
string_to_sign = '{0}\n{1}\n{2}\n{3}' \
.format(algorithm, amz_date, credential_scope,
hashlib.sha256(canonical_request).hexdigest())
signing_key = getSignatureKey(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
hashlib.sha256).hexdigest()
authorization_header = \
'{0} Credential={1}/{2}, SignedHeaders={3}, Signature={4}' \
.format(algorithm, access_key, credential_scope, signed_headers, signature)
# The 'host' header is added automatically by the Python 'requests' library.
headers = {
'Content-Type': content_type,
'X-Amz-Content-Sha256': payload_hash,
'X-Amz-Date': amz_date,
'Authorization': authorization_header
}
endpoint = 'http://' + host + canonical_uri + '?' + canonical_querystring;
r = requests.post(endpoint, data=request_parameters, headers=headers)
print (r.text)

View File

@ -1,20 +0,0 @@
FROM ghcr.io/scality/federation/nodesvc-base:7.10.5.0
ENV UTAPI_CONFIG_FILE=${CONF_DIR}/config.json
WORKDIR ${HOME_DIR}/utapi
COPY ./package.json ./yarn.lock ${HOME_DIR}/utapi
# Remove when gitcache is sorted out
RUN rm /root/.gitconfig
RUN yarn install --production --frozen-lockfile --network-concurrency 1
COPY . ${HOME_DIR}/utapi
RUN chown -R ${USER} ${HOME_DIR}/utapi
USER ${USER}
CMD bash -c "source ${CONF_DIR}/env && export && supervisord -c ${CONF_DIR}/${SUPERVISORD_CONF}"

1
images/warp10/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
*.jar filter=lfs diff=lfs merge=lfs -text

View File

@ -1,2 +0,0 @@
standalone.host = 0.0.0.0
standalone.port = 4802

View File

@ -13,7 +13,7 @@ RUN apk add zip unzip build-base \
&& cd .. \
&& go build -a -o /usr/local/go/warp10_sensision_exporter
FROM ghcr.io/scality/utapi/warp10:2.8.1-95-g73e7de80
FROM registry.scality.com/utapi/warp10:2.8.1-95-g73e7de80
# Override baked in version
# Remove when updating to a numbered release
@ -27,6 +27,8 @@ ENV SENSISION_DATA_DIR /data/sensision
ENV SENSISION_PORT 8082
# Modify Warp 10 default config
ENV standalone.host 0.0.0.0
ENV standalone.port 4802
ENV standalone.home /opt/warp10
ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warp.token.file /static.tokens
@ -51,6 +53,6 @@ COPY --from=builder /usr/local/go/warp10_sensision_exporter /usr/local/bin/warp1
ADD ./images/warp10/s6 /etc
ADD ./warpscript /usr/local/share/warpscript
ADD ./images/warp10/static.tokens /
ADD ./images/warp10/90-default-host-port.conf $WARP10_CONF_TEMPLATES/90-default-host-port.conf
CMD /init

View File

@ -3,8 +3,9 @@
JAVA="/usr/bin/java"
JAVA_OPTS=""
VERSION=1.0.23
SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${SENSISION_VERSION}.jar
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${VERSION}.jar
SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR}
SENSISION_CLASS=io.warp10.sensision.Main
export MALLOC_ARENA_MAX=1

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:389d2135867c399a389901460c5f2cc09f4857d0c6d08632c2638c25fb150c46
size 15468553

View File

@ -1,4 +1,5 @@
/* eslint-disable global-require */
// eslint-disable-line strict
let toExport;

View File

@ -1,13 +1,35 @@
/* eslint-disable no-bitwise */
const assert = require('assert');
const fs = require('fs');
const path = require('path');
/**
* Reads from a config file and returns the content as a config object
*/
class Config {
constructor(config) {
this.component = config.component;
constructor() {
/*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
this._basePath = path.resolve(__dirname, '..');
this.path = `${this._basePath}/config.json`;
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
this.path = process.env.UTAPI_CONFIG_FILE;
}
// Read config automatically
this._getConfig();
}
_getConfig() {
let config;
try {
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
config = JSON.parse(data);
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
this.port = 9500;
if (config.port !== undefined) {
@ -93,13 +115,6 @@ class Config {
}
}
if (config.vaultclient) {
// Instance passed from outside
this.vaultclient = config.vaultclient;
this.vaultd = null;
} else {
// Connection data
this.vaultclient = null;
this.vaultd = {};
if (config.vaultd) {
if (config.vaultd.port !== undefined) {
@ -114,7 +129,6 @@ class Config {
this.vaultd.host = config.vaultd.host;
}
}
}
if (config.certFilePaths) {
assert(typeof config.certFilePaths === 'object'
@ -127,11 +141,12 @@ class Config {
const { key, cert, ca } = config.certFilePaths
? config.certFilePaths : {};
if (key && cert) {
const keypath = key;
const certpath = cert;
const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
const certpath = (cert[0] === '/')
? cert : `${this._basePath}/${cert}`;
let capath;
if (ca) {
capath = ca;
capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`);
}
@ -157,13 +172,8 @@ class Config {
+ 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics;
}
if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}
return config;
}
}
module.exports = Config;
module.exports = new Config();

View File

@ -81,17 +81,6 @@ class Datastore {
return this._client.call((backend, done) => backend.incr(key, done), cb);
}
/**
* increment value of a key by the provided value
* @param {string} key - key holding the value
* @param {string} value - value containing the data
* @param {callback} cb - callback
* @return {undefined}
*/
incrby(key, value, cb) {
return this._client.call((backend, done) => backend.incrby(key, value, done), cb);
}
/**
* decrement value of a key by 1
* @param {string} key - key holding the value

View File

@ -6,6 +6,8 @@ const async = require('async');
const { errors } = require('arsenal');
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
const s3metricResponseJSON = require('../models/s3metricResponse');
const config = require('./Config');
const Vault = require('./Vault');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
@ -21,6 +23,7 @@ class ListMetrics {
constructor(metric, component) {
this.metric = metric;
this.service = component;
this.vault = new Vault(config);
}
/**
@ -80,10 +83,9 @@ class ListMetrics {
const resources = validator.get(this.metric);
const timeRange = validator.get('timeRange');
const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
@ -122,11 +124,10 @@ class ListMetrics {
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
const timeRange = [start - fifteenMinutes, end];
const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids
if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => {
return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
@ -312,10 +313,11 @@ class ListMetrics {
});
if (!areMetricsPositive) {
log.info('negative metric value found', {
error: resource,
method: 'ListMetrics.getMetrics',
});
return cb(errors.InternalError.customizeDescription(
'Utapi is in a transient state for this time period as '
+ 'metrics are being collected. Please try again in a few '
+ 'minutes.',
));
}
/**
* Batch result is of the format

View File

@ -99,7 +99,6 @@ const metricObj = {
buckets: 'bucket',
accounts: 'accountId',
users: 'userId',
location: 'location',
};
class UtapiClient {
@ -123,17 +122,13 @@ class UtapiClient {
const api = (config || {}).logApi || werelogs;
this.log = new api.Logger('UtapiClient');
// By default, we push all resource types
this.metrics = ['buckets', 'accounts', 'users', 'service', 'location'];
this.metrics = ['buckets', 'accounts', 'users', 'service'];
this.service = 's3';
this.disableOperationCounters = false;
this.enabledOperationCounters = [];
this.disableClient = true;
if (config && !config.disableClient) {
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
if (config.metrics) {
const message = 'invalid property in UtapiClient configuration';
assert(Array.isArray(config.metrics), `${message}: metrics `
@ -161,6 +156,9 @@ class UtapiClient {
if (config.enabledOperationCounters) {
this.enabledOperationCounters = config.enabledOperationCounters;
}
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
}
}
@ -548,9 +546,7 @@ class UtapiClient {
if (this._isCounterEnabled(counterAction)) {
cmds.push(['incr', generateKey(p, counterAction, timestamp)]);
}
cmds.push(['zrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp]);
});
return this.ds.batch(cmds, (err, results) => {
if (err) {
log.error('error pushing metric', {
@ -584,48 +580,13 @@ class UtapiClient {
// empty.
actionCounter = Number.isNaN(actionCounter)
|| actionCounter < 0 ? 1 : actionCounter;
if (Number.isInteger(params.byteLength)) {
/* byteLength is passed in from cloudserver under the follow conditions:
* - bucket versioning is suspended
* - object version id is null
* - the content length of the object exists
* In this case, the master key is deleted and replaced with a delete marker.
* The decrement accounts for the deletion of the master key when utapi reports
* on the number of objects.
*/
actionCounter -= 1;
}
const key = generateStateKey(p, 'numberOfObjects');
const byteArr = results[index + commandsGroupSize - 1][1];
const oldByteLength = byteArr ? parseInt(byteArr[0], 10) : 0;
const newByteLength = member.serialize(Math.max(0, oldByteLength - params.byteLength));
cmds2.push(
['zremrangebyscore', key, timestamp, timestamp],
['zadd', key, timestamp, member.serialize(actionCounter)],
);
if (Number.isInteger(params.byteLength)) {
cmds2.push(
['decr', generateCounter(p, 'numberOfObjectsCounter')],
['decrby', generateCounter(p, 'storageUtilizedCounter'), params.byteLength],
);
}
if (byteArr) {
cmds2.push(
['zremrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp],
['zadd', generateStateKey(p, 'storageUtilized'), timestamp, newByteLength],
);
}
return true;
});
if (noErr) {
return this.ds.batch(cmds2, cb);
}
@ -1156,69 +1117,6 @@ class UtapiClient {
});
}
/**
*
* @param {string} location - name of data location
* @param {number} updateSize - size in bytes to update location metric by,
* could be negative, indicating deleted object
* @param {string} reqUid - Request Unique Identifier
* @param {function} callback - callback to call
* @return {undefined}
*/
pushLocationMetric(location, updateSize, reqUid, callback) {
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
const params = {
level: 'location',
service: 's3',
location,
};
this._checkMetricTypes(params);
const action = (updateSize < 0) ? 'decrby' : 'incrby';
const size = (updateSize < 0) ? -updateSize : updateSize;
return this.ds[action](generateKey(params, 'locationStorage'), size,
err => {
if (err) {
log.error('error pushing metric', {
method: 'UtapiClient.pushLocationMetric',
error: err,
});
return callback(errors.InternalError);
}
return callback();
});
}
/**
*
* @param {string} location - name of data backend to get metric for
* @param {string} reqUid - Request Unique Identifier
* @param {function} callback - callback to call
* @return {undefined}
*/
getLocationMetric(location, reqUid, callback) {
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
const params = {
level: 'location',
service: 's3',
location,
};
const redisKey = generateKey(params, 'locationStorage');
return this.ds.get(redisKey, (err, bytesStored) => {
if (err) {
log.error('error getting metric', {
method: 'UtapiClient: getLocationMetric',
error: err,
});
return callback(errors.InternalError);
}
// if err and bytesStored are null, key does not exist yet
if (bytesStored === null) {
return callback(null, 0);
}
return callback(null, bytesStored);
});
}
/**
* Get storage used by bucket/account/user/service
* @param {object} params - params for the metrics

View File

@ -16,19 +16,15 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex {
constructor(config) {
this._enabled = false;
this._schedule = REINDEX_SCHEDULE;
this._redis = {
name: 'scality-s3',
sentinelPassword: '',
sentinels: [{
this._sentinel = {
host: '127.0.0.1',
port: 16379,
}],
name: 'scality-s3',
sentinelPassword: '',
};
this._bucketd = {
host: '127.0.0.1',
@ -46,13 +42,14 @@ class UtapiReindex {
if (config && config.password) {
this._password = config.password;
}
if (config && config.redis) {
if (config && config.sentinel) {
const {
name, sentinelPassword, sentinels,
} = config.redis;
this._redis.name = name || this._redis.name;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
this._redis.sentinels = sentinels || this._redis.sentinels;
host, port, name, sentinelPassword,
} = config.sentinel;
this._sentinel.host = host || this._sentinel.host;
this._sentinel.port = port || this._sentinel.port;
this._sentinel.name = name || this._sentinel.name;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
}
if (config && config.bucketd) {
const { host, port } = config.bucketd;
@ -64,16 +61,17 @@ class UtapiReindex {
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}
this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);
this._requestLogger = this._log.newRequestLogger();
}
_getRedisClient() {
const client = new RedisClient({
sentinels: this._redis.sentinels,
name: this._redis.name,
sentinelPassword: this._redis.sentinelPassword,
sentinels: [{
host: this._sentinel.host,
port: this._sentinel.port,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password,
});
client.connect();
@ -88,18 +86,17 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY);
}
_buildFlags(sentinel) {
_buildFlags() {
const flags = {
/* eslint-disable camelcase */
sentinel_ip: sentinel.host,
sentinel_port: sentinel.port,
sentinel_cluster_name: this._redis.name,
sentinel_ip: this._sentinel.host,
sentinel_port: this._sentinel.port,
sentinel_cluster_name: this._sentinel.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
};
if (this._redis.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword;
if (this._sentinel.sentinelPassword) {
flags.redis_password = this._sentinel.sentinelPassword;
}
/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
@ -108,15 +105,11 @@ class UtapiReindex {
opts.push(name);
opts.push(flags[flag]);
});
if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts;
}
_runScriptWithSentinels(path, remainingSentinels, done) {
const flags = this._buildFlags(remainingSentinels.shift());
_runScript(path, done) {
const flags = this._buildFlags();
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
process.stdout.on('data', data => {
@ -143,17 +136,6 @@ class UtapiReindex {
statusCode: code,
script: path,
});
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else {
this._requestLogger.info('script exited successfully', {
statusCode: code,
@ -164,11 +146,6 @@ class UtapiReindex {
});
}
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock()

View File

@ -14,15 +14,6 @@ class UtapiRequest {
this._datastore = null;
this._requestQuery = null;
this._requestPath = null;
this._vault = null;
}
getVault() {
return this._vault;
}
setVault() {
return this._vault;
}
/**

View File

@ -1,21 +1,16 @@
import argparse
import ast
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests
import redis
import json
import ast
import sys
from threading import Thread
import time
import urllib
import re
import sys
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex:reporting')
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
import argparse
def get_options():
parser = argparse.ArgumentParser()
@ -34,19 +29,8 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password
r = redis.Redis(
host=ip,
port=port,
db=0,
password=password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
r = redis.Redis(host=ip, port=port, db=0, password=password)
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)

View File

@ -1,6 +1,5 @@
import argparse
import concurrent.futures as futures
import functools
import itertools
import json
import logging
@ -9,9 +8,9 @@ import re
import sys
import time
import urllib
from pathlib import Path
from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor
from pprint import pprint
import redis
import requests
@ -25,9 +24,6 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -36,38 +32,9 @@ def get_options():
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]
return options
def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path
return parser.parse_args()
def chunks(iterable, size):
it = iter(iterable)
@ -82,7 +49,7 @@ def _encoded(func):
return urllib.parse.quote(val.encode('utf-8'))
return inner
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'])
Bucket = namedtuple('Bucket', ['userid', 'name'])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])
@ -94,21 +61,15 @@ class InvalidListing(Exception):
def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket)
class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)
class BucketDClient:
'''Performs Listing calls against bucketd'''
__url_attribute_format = '{addr}/default/attributes/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__url_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
def __init__(self, bucketd_addr=None, max_retries=2):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()
def _do_req(self, url, check_500=True, **kwargs):
@ -140,7 +101,7 @@ class BucketDClient:
parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded.
'''
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop
@ -153,9 +114,6 @@ class BucketDClient:
_log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200:
payload = resp.json()
except ValueError as e:
@ -177,37 +135,7 @@ class BucketDClient:
else:
is_truncated = len(payload) > 0
@functools.lru_cache(maxsize=16)
def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise BucketNotFound(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
raise
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise
def get_bucket_md(self, name):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))
def list_buckets(self, account=None):
def list_buckets(self, name = None):
def get_next_marker(p):
if p is None:
@ -219,24 +147,19 @@ class BucketDClient:
'maxKeys': 1000,
'marker': get_next_marker
}
if account is not None:
params['prefix'] = '%s..|..' % account
for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload.get('Contents', []):
for result in payload['Contents']:
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False)
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
bucket = Bucket(*match.groups())
if name is None or bucket.name == name:
buckets.append(bucket)
if buckets:
yield buckets
if name is not None:
# Break on the first matching bucket if a name is given
break
def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
@ -273,12 +196,18 @@ class BucketDClient:
upload_id=key['value']['UploadId']))
return keys
def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
def _sum_objects(self, bucket, listing):
count = 0
total_size = 0
last_key = None
try:
for obj in listing:
last_master = None
last_size = None
for status_code, payload in listing:
contents = payload['Contents'] if isinstance(payload, dict) else payload
if contents is None:
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code))
raise InvalidListing(bucket)
for obj in contents:
count += 1
if isinstance(obj['value'], dict):
# bucketd v6 returns a dict:
data = obj.get('value', {})
@ -287,51 +216,39 @@ class BucketDClient:
# bucketd v7 returns an encoded string
data = json.loads(obj['value'])
size = data.get('content-length', 0)
is_latest = obj['key'] != last_key
last_key = obj['key']
if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue
count += 1
total_size += size
except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
return count, total_size
# If versioned, subtract the size of the master to avoid double counting
if last_master is not None and obj['key'].startswith(last_master + '\x00'):
_log.debug('Detected versioned key: %s - subtracting master size: %i'% (
obj['key'],
last_size,
))
total_size -= last_size
count -= 1
last_master = None
def _extract_listing(self, key, listing):
for status_code, payload in listing:
contents = payload[key] if isinstance(payload, dict) else payload
if contents is None:
raise InvalidListing('')
for obj in contents:
yield obj
# Only save master versions
elif '\x00' not in obj['key']:
last_master = obj['key']
last_size = size
return count, total_size
def count_bucket_contents(self, bucket):
def get_key_marker(p):
if p is None:
def get_next_marker(p):
if p is None or len(p) == 0:
return ''
return p.get('NextKeyMarker', '')
def get_vid_marker(p):
if p is None:
return ''
return p.get('NextVersionIdMarker', '')
return p[-1].get('key', '')
params = {
'listingType': 'DelimiterVersions',
'listingType': 'Basic',
'maxKeys': 1000,
'keyMarker': get_key_marker,
'versionIdMarker': get_vid_marker,
'gt': get_next_marker,
}
listing = self._list_bucket(bucket.name, **params)
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params))
return BucketContents(
bucket=bucket,
obj_count=count,
@ -339,8 +256,7 @@ class BucketDClient:
)
def count_mpu_parts(self, mpu):
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
def get_prefix(p):
if p is None:
@ -360,31 +276,13 @@ class BucketDClient:
'listingType': 'Delimiter',
}
listing = self._list_bucket(shadow_bucket_name, **params)
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params))
return BucketContents(
bucket=shadow_bucket,
bucket=mpu.bucket._replace(name=_bucket),
obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size
)
def list_all_buckets(bucket_client):
return bucket_client.list_buckets()
def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)
def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue
yield batch
def index_bucket(client, bucket):
'''
@ -424,16 +322,9 @@ def get_redis_client(options):
host=options.sentinel_ip,
port=options.sentinel_port,
db=0,
password=options.redis_password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
password=options.redis_password
)
try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
return redis.Redis(
host=ip,
port=port,
@ -467,24 +358,16 @@ def log_report(resource, name, obj_count, total_size):
if __name__ == '__main__':
options = get_options()
if options.debug:
_log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
failed_accounts = set()
if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)
with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in batch_generator:
for batch in bucket_client.list_buckets(options.bucket):
bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
for job in futures.as_completed(jobs.keys()):
@ -503,33 +386,22 @@ if __name__ == '__main__':
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
# Bucket reports can be updated as we get them
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
if options.bucket is None:
stale_buckets = recorded_buckets.difference(observed_buckets)
elif observed_buckets and options.bucket not in recorded_buckets:
# The provided bucket does not exist, so clean up any metrics
stale_buckets = { options.bucket }
else:
stale_buckets = set()
_log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
@ -538,19 +410,9 @@ if __name__ == '__main__':
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
else:
if options.bucket is None:
# Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run:
for userid, report in account_reports.items():
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
else:
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
@ -559,25 +421,13 @@ if __name__ == '__main__':
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
if options.account:
stale_accounts = { a for a in options.account if a not in observed_accounts }
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
_log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run:
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:

View File

@ -68,10 +68,10 @@ const keys = {
*/
function getSchemaPrefix(params, timestamp) {
const {
bucket, accountId, userId, level, service, location,
bucket, accountId, userId, level, service,
} = params;
// `service` property must remain last because other objects also include it
const id = bucket || accountId || userId || location || service;
const id = bucket || accountId || userId || service;
const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:`
: `${service}:${level}:${id}:`;
return prefix;
@ -86,13 +86,9 @@ function getSchemaPrefix(params, timestamp) {
*/
function generateKey(params, metric, timestamp) {
const prefix = getSchemaPrefix(params, timestamp);
if (params.location) {
return `${prefix}locationStorage`;
}
return keys[metric](prefix);
}
/**
* Returns a list of the counters for a metric type
* @param {object} params - object with metric type and id as a property

View File

@ -7,6 +7,7 @@ const { Clustering, errors, ipCheck } = require('arsenal');
const arsenalHttps = require('arsenal').https;
const { Logger } = require('werelogs');
const config = require('./Config');
const routes = require('../router/routes');
const Route = require('../router/Route');
const Router = require('../router/Router');
@ -27,12 +28,7 @@ class UtapiServer {
constructor(worker, port, datastore, logger, config) {
this.worker = worker;
this.port = port;
this.vault = config.vaultclient;
if (!this.vault) {
const Vault = require('./Vault');
this.vault = new Vault(config);
}
this.router = new Router(config, this.vault);
this.router = new Router(config);
this.logger = logger;
this.datastore = datastore;
this.server = null;
@ -75,7 +71,6 @@ class UtapiServer {
req.socket.setNoDelay();
const { query, path, pathname } = url.parse(req.url, true);
const utapiRequest = new UtapiRequest()
.setVault(this.vault)
.setRequest(req)
.setLog(this.logger.newRequestLogger())
.setResponse(res)
@ -219,7 +214,8 @@ class UtapiServer {
* @property {object} params.log - logger configuration
* @return {undefined}
*/
function spawn(config) {
function spawn(params) {
Object.assign(config, params);
const {
workers, redis, log, port,
} = config;

View File

@ -19,7 +19,9 @@ class RedisCache {
moduleLogger.debug('Connecting to redis...');
this._redis = new RedisClient(this._options);
this._redis.connect();
return true;
return new Promise(resolve => {
this._redis.once('ready', () => resolve(true));
});
}
async disconnect() {

View File

@ -23,6 +23,10 @@
"healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"]
},
"vaultd": {
"host": "127.0.0.1",
"port": 8500
},
"cacheBackend": "memory",
"development": false,
"nodeId": "single_node",
@ -50,15 +54,5 @@
"filter": {
"allow": {},
"deny": {}
},
"metrics" : {
"enabled": false,
"host": "localhost",
"ingestPort": 10902,
"checkpointPort": 10903,
"snapshotPort": 10904,
"diskUsagePort": 10905,
"reindexPort": 10906,
"repairPort": 10907
}
}

View File

@ -2,8 +2,6 @@ const fs = require('fs');
const path = require('path');
const Joi = require('@hapi/joi');
const assert = require('assert');
const defaults = require('./defaults.json');
const werelogs = require('werelogs');
const {
truthy, envNamespace, allowedFilterFields, allowedFilterStates,
@ -73,6 +71,7 @@ class Config {
constructor(overrides) {
this._basePath = path.join(__dirname, '../../');
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
this._defaultsPath = path.join(__dirname, 'defaults.json');
this.host = undefined;
this.port = undefined;
@ -90,11 +89,6 @@ class Config {
parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
}
Object.assign(this, parsedConfig);
werelogs.configure({
level: Config.logging.level,
dump: Config.logging.dumpLevel,
});
}
static _readFile(path, encoding = 'utf-8') {
@ -119,7 +113,7 @@ class Config {
}
_loadDefaults() {
return defaults;
return Config._readJSON(this._defaultsPath);
}
_loadUserConfig() {
@ -198,10 +192,6 @@ class Config {
`${prefix}_SENTINEL_PASSWORD`,
config.sentinelPassword,
);
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
config.password,
);
} else {
redisConf.host = _loadFromEnv(
`${prefix}_HOST`,
@ -385,17 +375,6 @@ class Config {
parsedConfig.filter = Config._parseResourceFilters(config.filter);
parsedConfig.metrics = {
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
};
return parsedConfig;
}

View File

@ -115,16 +115,7 @@ const schema = Joi.object({
return filterObj;
}, {},
)),
metrics: {
enabled: Joi.boolean(),
host: Joi.string(),
ingestPort: Joi.number().port(),
checkpointPort: Joi.number().port(),
snapshotPort: Joi.number().port(),
diskUsagePort: Joi.number().port(),
reindexPort: Joi.number().port(),
repairPort: Joi.number().port(),
},
});
module.exports = schema;

View File

@ -22,7 +22,6 @@ const constants = {
'deleteBucketEncryption',
'deleteBucketLifecycle',
'deleteBucketReplication',
'deleteBucketTagging',
'deleteBucketWebsite',
'deleteObject',
'deleteObjectTagging',
@ -35,7 +34,6 @@ const constants = {
'getBucketObjectLock',
'getBucketReplication',
'getBucketVersioning',
'getBucketTagging',
'getBucketWebsite',
'getObject',
'getObjectAcl',
@ -57,7 +55,6 @@ const constants = {
'putBucketObjectLock',
'putBucketReplication',
'putBucketVersioning',
'putBucketTagging',
'putBucketWebsite',
'putDeleteMarkerObject',
'putObject',

View File

@ -1,5 +1,5 @@
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
const bucketclient = require('bucketclient');
const { BucketClientInterface } = require('arsenal').storage.metadata.bucketclient;
const config = require('../config');
const { LoggerContext } = require('../utils');

View File

@ -1,6 +1,6 @@
/* eslint-disable no-restricted-syntax */
const arsenal = require('arsenal');
const async = require('async');
const metadata = require('./client');
const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants');
@ -12,14 +12,9 @@ const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
const PAGE_SIZE = 1000;
async function _listingWrapper(bucket, params) {
function _listingWrapper(bucket, params) {
return new Promise(
(resolve, reject) => metadata.listObject(
bucket,
@ -46,7 +41,7 @@ function _listObject(bucket, prefix, hydrateFunc) {
try {
// eslint-disable-next-line no-await-in-loop
res = await async.retryable(ebConfig, _listingWrapper)(bucket, { ...listingParams, gt });
res = await _listingWrapper(bucket, { ...listingParams, gt });
} catch (error) {
moduleLogger.error('Error during listing', { error });
throw error;
@ -108,7 +103,7 @@ function bucketExists(bucket) {
bucket,
logger.newRequestLogger(),
err => {
if (err && (!err.is || !err.is.NoSuchBucket)) {
if (err && !err.NoSuchBucket) {
reject(err);
return;
}

View File

@ -3,7 +3,6 @@ const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const { apiOperations } = require('../server/spec');
const ResponseContainer = require('./ResponseContainer');
const { httpRequestDurationSeconds } = require('../server/metrics');
const apiTags = Object.keys(apiOperations);
const apiOperationIds = Object.values(apiOperations)
@ -22,7 +21,6 @@ const contextSchema = {
logger: Joi.any(),
request: Joi.any(),
results: Joi.any(),
requestTimer: Joi.any(),
};
const RequestContextModel = buildModel('RequestContext', contextSchema);
@ -36,10 +34,6 @@ class RequestContext extends RequestContextModel {
const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation;
const requestTimer = tag !== 'internal'
? httpRequestDurationSeconds.startTimer({ action: operationId })
: null;
request.logger.logger.addDefaultFields({
tag,
operationId,
@ -56,7 +50,6 @@ class RequestContext extends RequestContextModel {
encrypted,
results: new ResponseContainer(),
logger: request.logger,
requestTimer,
});
}

View File

@ -6,8 +6,7 @@ const BackOff = require('backo');
const { whilst } = require('async');
const errors = require('./errors');
const { LoggerContext } = require('./utils/log');
const { asyncOrCallback } = require('./utils/func');
const { LoggerContext, asyncOrCallback } = require('./utils');
const moduleLogger = new LoggerContext({
module: 'redis',

View File

@ -28,7 +28,6 @@ class UtapiServer extends Process {
app.use(middleware.loggerMiddleware);
await initializeOasTools(spec, app);
app.use(middleware.errorMiddleware);
app.use(middleware.httpMetricsMiddleware);
app.use(middleware.responseLoggerMiddleware);
return app;
}

View File

@ -1,20 +0,0 @@
const promClient = require('prom-client');
const httpRequestsTotal = new promClient.Counter({
name: 's3_utapi_http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['action', 'code'],
});
const httpRequestDurationSeconds = new promClient.Histogram({
name: 's3_utapi_http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['action', 'code'],
// buckets for response time from 0.1ms to 60s
buckets: [0.0001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0],
});
module.exports = {
httpRequestDurationSeconds,
httpRequestsTotal,
};

View File

@ -6,7 +6,6 @@ const config = require('../config');
const { logger, buildRequestLogger } = require('../utils');
const errors = require('../errors');
const { translateAndAuthorize } = require('../vault');
const metricHandlers = require('./metrics');
const oasOptions = {
controllers: path.join(__dirname, './API/'),
@ -56,23 +55,6 @@ function responseLoggerMiddleware(req, res, next) {
}
}
function httpMetricsMiddleware(request, response, next) {
// If the request.ctx is undefined then this is an internal oasTools request (/_/docs)
// No metrics should be pushed
if (config.metrics.enabled && request.ctx && request.ctx.tag !== 'internal') {
metricHandlers.httpRequestsTotal
.labels({
action: request.ctx.operationId,
code: response.statusCode,
}).inc(1);
request.ctx.requestTimer({ code: response.statusCode });
}
if (next) {
next();
}
}
// next is purposely not called as all error responses are handled here
// eslint-disable-next-line no-unused-vars
function errorMiddleware(err, req, res, next) {
@ -100,7 +82,7 @@ function errorMiddleware(err, req, res, next) {
code,
message,
});
responseLoggerMiddleware(req, res, () => httpMetricsMiddleware(req, res));
responseLoggerMiddleware(req, res);
}
// eslint-disable-next-line no-unused-vars
@ -137,7 +119,7 @@ async function authV4Middleware(request, response, params) {
} catch (error) {
request.logger.error('error during authentication', { error });
// rethrow any access denied errors
if ((error.is && error.is.AccessDenied) || (error.utapiError && error.AccessDenied)) {
if (error.AccessDenied) {
throw error;
}
throw errors.InternalError;
@ -176,6 +158,5 @@ module.exports = {
responseLoggerMiddleware,
authV4Middleware,
clientIpLimitMiddleware,
httpMetricsMiddleware,
},
};

View File

@ -1,12 +1,10 @@
const assert = require('assert');
const cron = require('node-schedule');
const cronparser = require('cron-parser');
const promClient = require('prom-client');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { client: cacheClient } = require('../cache');
const Process = require('../process');
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
const { LoggerContext, iterIfError } = require('../utils');
const logger = new LoggerContext({
module: 'BaseTask',
@ -24,11 +22,6 @@ class BaseTask extends Process {
this._scheduler = null;
this._defaultSchedule = Now;
this._defaultLag = 0;
this._enableMetrics = options.enableMetrics || false;
this._metricsHost = options.metricsHost || 'localhost';
this._metricsPort = options.metricsPort || 9001;
this._metricsHandlers = null;
this._probeServer = null;
}
async _setup(includeDefaultOpts = true) {
@ -46,75 +39,6 @@ class BaseTask extends Process {
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
.option('-n, --node-id <id>', 'Set a custom node id');
}
if (this._enableMetrics) {
promClient.collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
this._metricsHandlers = {
...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(),
};
await this._createProbeServer();
}
}
_registerDefaultMetricHandlers() {
const taskName = this.constructor.name;
// Get the name of our subclass in snake case format eg BaseClass => _base_class
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
const executionDuration = new promClient.Gauge({
name: `s3_utapi${taskNameSnake}_duration_seconds`,
help: `Execution time of the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionAttempts = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_attempts_total`,
help: `Total number of attempts to execute the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionFailures = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_failures_total`,
help: `Total number of failures executing the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
return {
executionDuration,
executionAttempts,
executionFailures,
};
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
return {};
}
async _createProbeServer() {
this._probeServer = await startProbeServer({
bindAddress: this._metricsHost,
port: this._metricsPort,
});
this._probeServer.addHandler(
DEFAULT_METRICS_ROUTE,
(res, log) => {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': promClient.register.contentType,
});
promClient.register.metrics().then(metrics => {
res.end(metrics);
});
},
);
}
get schedule() {
@ -155,23 +79,12 @@ class BaseTask extends Process {
}
async execute() {
let endTimer;
if (this._enableMetrics) {
endTimer = this._metricsHandlers.executionDuration.startTimer();
this._metricsHandlers.executionAttempts.inc(1);
}
try {
const timestamp = new Date() * 1000; // Timestamp in microseconds;
const laggedTimestamp = timestamp - (this.lag * 1000000);
await this._execute(laggedTimestamp);
} catch (error) {
logger.error('Error during task execution', { error });
this._metricsHandlers.executionFailures.inc(1);
}
if (this._enableMetrics) {
endTimer();
}
}
@ -181,9 +94,6 @@ class BaseTask extends Process {
}
async _join() {
if (this._probeServer !== null) {
this._probeServer.stop();
}
return this._cache.disconnect();
}

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { checkpointLagSecs, indexedEventFields } = require('../constants');
@ -10,88 +9,11 @@ const logger = new LoggerContext({
class CreateCheckpoint extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.checkpointPort,
...options,
});
super(options);
this._defaultSchedule = config.checkpointSchedule;
this._defaultLag = checkpointLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_checkpoint_created_total',
help: 'Total number of checkpoints created',
labelNames: ['origin', 'containerName'],
});
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
const lastCheckpoint = new promClient.Gauge({
name: 's3_utapi_create_checkpoint_last_checkpoint_seconds',
help: 'Timestamp of the last successfully created checkpoint',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastCheckpoint();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastCheckpoint,
};
}
/**
* Metrics for CreateCheckpoint
* @typedef {Object} CreateCheckpointMetrics
* @property {number} created - Number of checkpoints created
*/
/**
*
* @param {CreateCheckpointMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _getLastCheckpoint() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.checkpoint.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (!resp.result || (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) {
logger.debug('creating checkpoints', { checkpointTimestamp: timestamp });
const status = await this.withWarp10(async warp10 => {
@ -107,7 +29,6 @@ class CreateCheckpoint extends BaseTask {
});
if (status.result[0]) {
logger.info(`created ${status.result[0] || 0} checkpoints`);
this._pushMetrics({ created: status.result[0] });
}
}
}

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { snapshotLagSecs } = require('../constants');
@ -10,88 +9,11 @@ const logger = new LoggerContext({
class CreateSnapshot extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.snapshotPort,
...options,
});
super(options);
this._defaultSchedule = config.snapshotSchedule;
this._defaultLag = snapshotLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_create_snapshot_created_total',
help: 'Total number of snapshots created',
labelNames: ['origin', 'containerName'],
});
const getLastSnapshot = this._getLastSnapshot.bind(this);
const lastSnapshot = new promClient.Gauge({
name: 's3_utapi_create_snapshot_last_snapshot_seconds',
help: 'Timestamp of the last successfully created snapshot',
labelNames: ['origin', 'containerName'],
async collect() {
try {
const timestamp = await getLastSnapshot();
if (timestamp !== null) {
this.set(timestamp);
}
} catch (error) {
logger.error('error during metric collection', { error });
}
},
});
return {
created,
lastSnapshot,
};
}
/**
* Metrics for CreateSnapshot
* @typedef {Object} CreateSnapshotMetrics
* @property {number} created - Number of snapshots created
*/
/**
*
* @param {CreateSnapshotMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _getLastSnapshot() {
const resp = await this.withWarp10(async warp10 => warp10.fetch({
className: 'utapi.snapshot.master',
labels: {
node: warp10.nodeId,
},
start: 'now',
stop: -1,
}));
if (!resp.result || (resp.result.length === 0 || resp.result[0] === '' || resp.result[0] === '[]')) {
return null;
}
const result = JSON.parse(resp.result[0])[0];
const timestamp = result.v[0][0];
return timestamp / 1000000;// Convert timestamp from microseconds to seconds
}
async _execute(timestamp) {
logger.debug('creating snapshots', { snapshotTimestamp: timestamp });
@ -107,7 +29,6 @@ class CreateSnapshot extends BaseTask {
});
if (status.result[0]) {
logger.info(`created ${status.result[0]} snapshots`);
this._pushMetrics({ created: status.result[0] });
}
}
}

View File

@ -1,7 +1,4 @@
const async = require('async');
const Path = require('path');
const fs = require('fs');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { expirationChunkDuration } = require('../constants');
@ -19,13 +16,9 @@ const ACTION_THRESHOLD = 0.95;
class MonitorDiskUsage extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.diskUsagePort,
...options,
});
super(
options,
);
this._defaultSchedule = config.diskUsageSchedule;
this._defaultLag = 0;
this._path = config.diskUsage.path;
@ -49,88 +42,6 @@ class MonitorDiskUsage extends BaseTask {
);
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const isLocked = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_is_locked',
help: 'Indicates whether the monitored warp 10 has had writes disabled',
labelNames: ['origin', 'containerName'],
});
const leveldbBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_leveldb_bytes',
help: 'Total bytes used by warp 10 leveldb',
labelNames: ['origin', 'containerName'],
});
const datalogBytes = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_datalog_bytes',
help: 'Total bytes used by warp 10 datalog',
labelNames: ['origin', 'containerName'],
});
const hardLimitRatio = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_ratio',
help: 'Percent of the hard limit used by warp 10',
labelNames: ['origin', 'containerName'],
});
const hardLimitSetting = new promClient.Gauge({
name: 's3_utapi_monitor_disk_usage_hard_limit_bytes',
help: 'The hard limit setting in bytes',
labelNames: ['origin', 'containerName'],
});
return {
isLocked,
leveldbBytes,
datalogBytes,
hardLimitRatio,
hardLimitSetting,
};
}
/**
* Metrics for MonitorDiskUsage
* @typedef {Object} MonitorDiskUsageMetrics
* @property {boolean} isLocked - Indicates if writes have been disabled for the monitored warp10
* @property {number} leveldbBytes - Total bytes used by warp 10 leveldb
* @property {number} datalogBytes - Total bytes used by warp 10 datalog
* @property {number} hardLimitRatio - Percent of the hard limit used by warp 10
* @property {number} hardLimitSetting - The hard limit setting in bytes
*/
/**
*
* @param {MonitorDiskUsageMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.isLocked !== undefined) {
this._metricsHandlers.isLocked.set(metrics.isLocked ? 1 : 0);
}
if (metrics.leveldbBytes !== undefined) {
this._metricsHandlers.leveldbBytes.set(metrics.leveldbBytes);
}
if (metrics.datalogBytes !== undefined) {
this._metricsHandlers.datalogBytes.set(metrics.datalogBytes);
}
if (metrics.hardLimitRatio !== undefined) {
this._metricsHandlers.hardLimitRatio.set(metrics.hardLimitRatio);
}
if (metrics.hardLimitSetting !== undefined) {
this._metricsHandlers.hardLimitSetting.set(metrics.hardLimitSetting);
}
}
get isLeader() {
return this._program.leader !== undefined;
}
@ -143,13 +54,9 @@ class MonitorDiskUsage extends BaseTask {
return this._program.lock !== undefined;
}
// eslint-disable-next-line class-methods-use-this
async _getUsage(path) {
moduleLogger.debug(`calculating disk usage for ${path}`);
if (!fs.existsSync(path)) {
throw Error(`failed to calculate usage for non-existent path ${path}`);
}
return getFolderSize(path);
_getUsage() {
moduleLogger.debug(`calculating disk usage for ${this._path}`);
return getFolderSize(this._path);
}
async _expireMetrics(timestamp) {
@ -195,7 +102,7 @@ class MonitorDiskUsage extends BaseTask {
}
_checkHardLimit(size, nodeId) {
const hardPercentage = parseFloat((size / this._hardLimit).toFixed(2));
const hardPercentage = (size / this._hardLimit).toFixed(2);
const hardLimitHuman = formatDiskSize(this._hardLimit);
const hardLogger = moduleLogger.with({
size,
@ -206,8 +113,6 @@ class MonitorDiskUsage extends BaseTask {
nodeId,
});
this._pushMetrics({ hardLimitRatio: hardPercentage });
const msg = `Using ${hardPercentage * 100}% of the ${hardLimitHuman} hard limit on ${nodeId}`;
if (hardPercentage < WARN_THRESHOLD) {
@ -245,14 +150,12 @@ class MonitorDiskUsage extends BaseTask {
if (this.isManualUnlock) {
moduleLogger.info('manually unlocking warp 10', { nodeId: this.nodeId });
await this._enableWarp10Updates();
this._pushMetrics({ isLocked: false });
return;
}
if (this.isManualLock) {
moduleLogger.info('manually locking warp 10', { nodeId: this.nodeId });
await this._disableWarp10Updates();
this._pushMetrics({ isLocked: true });
return;
}
@ -267,21 +170,16 @@ class MonitorDiskUsage extends BaseTask {
return;
}
let leveldbBytes = null;
let datalogBytes = null;
let size = null;
try {
leveldbBytes = await this._getUsage(Path.join(this._path, 'leveldb'));
datalogBytes = await this._getUsage(Path.join(this._path, 'datalog'));
size = await this._getUsage();
} catch (error) {
moduleLogger.error(`error calculating disk usage for ${this._path}`, { error });
return;
}
this._pushMetrics({ leveldbBytes, datalogBytes });
const size = leveldbBytes + datalogBytes;
if (this._hardLimit !== null) {
moduleLogger.info(`warp 10 using ${formatDiskSize(size)} of disk space`, { leveldbBytes, datalogBytes });
moduleLogger.info(`warp 10 leveldb using ${formatDiskSize(size)} of disk space`, { usage: size });
const shouldLock = this._checkHardLimit(size, this.nodeId);
if (shouldLock) {
@ -292,7 +190,6 @@ class MonitorDiskUsage extends BaseTask {
{ nodeId: this.nodeId });
await this._enableWarp10Updates();
}
this._pushMetrics({ isLocked: shouldLock, hardLimitSetting: this._hardLimit });
}
}
}

View File

@ -1,6 +1,5 @@
const assert = require('assert');
const async = require('async');
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const config = require('../config');
@ -17,88 +16,12 @@ const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
class IngestShardTask extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.ingestPort,
...options,
});
super(options);
this._defaultSchedule = config.ingestionSchedule;
this._defaultLag = config.ingestionLagSeconds;
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const ingestedTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_ingest_total',
help: 'Total number of metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedSlow = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_slow_total',
help: 'Total number of slow metrics ingested',
labelNames: ['origin', 'containerName'],
});
const ingestedShards = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_ingest_total',
help: 'Total number of metric shards ingested',
labelNames: ['origin', 'containerName'],
});
const shardAgeTotal = new promClient.Counter({
name: 's3_utapi_ingest_shard_task_shard_age_total',
help: 'Total aggregated age of shards',
labelNames: ['origin', 'containerName'],
});
return {
ingestedTotal,
ingestedSlow,
ingestedShards,
shardAgeTotal,
};
}
/**
* Metrics for IngestShardTask
* @typedef {Object} IngestShardMetrics
* @property {number} ingestedTotal - Number of events ingested
* @property {number} ingestedSlow - Number of slow events ingested
* @property {number} ingestedShards - Number of metric shards ingested
* @property {number} shardAgeTotal - Aggregated age of shards
*/
/**
*
* @param {IngestShardMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.ingestedTotal !== undefined) {
this._metricsHandlers.ingestedTotal.inc(metrics.ingestedTotal);
}
if (metrics.ingestedSlow !== undefined) {
this._metricsHandlers.ingestedSlow.inc(metrics.ingestedSlow);
}
if (metrics.ingestedShards !== undefined) {
this._metricsHandlers.ingestedShards.inc(metrics.ingestedShards);
}
if (metrics.shardAgeTotal !== undefined) {
this._metricsHandlers.shardAgeTotal.inc(metrics.shardAgeTotal);
}
}
_hydrateEvent(data, stripTimestamp = false) {
const event = JSON.parse(data);
if (this._stripEventUUID) {
@ -124,8 +47,6 @@ class IngestShardTask extends BaseTask {
return;
}
let shardAgeTotal = 0;
let ingestedShards = 0;
await async.eachLimit(toIngest, 10,
async shard => {
if (await this._cache.shardExists(shard)) {
@ -163,13 +84,6 @@ class IngestShardTask extends BaseTask {
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
shardAgeTotal += shardAge;
ingestedShards += 1;
this._pushMetrics({ ingestedTotal: records.length });
if (areSlowEvents) {
this._pushMetrics({ ingestedSlow: records.length });
}
} else {
logger.debug('No events found in shard, cleaning up');
}
@ -177,8 +91,6 @@ class IngestShardTask extends BaseTask {
logger.warn('shard does not exist', { shard });
}
});
const shardAgeTotalSecs = shardAgeTotal / 1000000;
this._pushMetrics({ shardAgeTotal: shardAgeTotalSecs, ingestedShards });
}
}

View File

@ -20,23 +20,11 @@ const logger = new LoggerContext({
class ReindexTask extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.reindexPort,
...options,
});
super(options);
this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0;
const eventFilters = (config && config.filter) || {};
this._shouldReindex = buildFilterChain((config && config.filter) || {});
// exponential backoff: max wait = 50 * 2 ^ 10 milliseconds ~= 51 seconds
this.ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => logger.info(...args), 'reindex resource filtering enabled', eventFilters);
}
@ -164,6 +152,7 @@ class ReindexTask extends BaseTask {
if (this._program.bucket.length) {
return this._program.bucket.map(name => ({ name }));
}
return metadata.listBuckets();
}
@ -185,8 +174,8 @@ class ReindexTask extends BaseTask {
let mpuTotal;
try {
bktTotal = await async.retryable(this.ebConfig, ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(this.ebConfig, ReindexTask._indexMpuBucket)(mpuBucket);
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) {
logger.error(
'failed bucket reindex. any associated account will be skipped',

View File

@ -1,4 +1,3 @@
const promClient = require('prom-client');
const BaseTask = require('./BaseTask');
const config = require('../config');
const { LoggerContext } = require('../utils');
@ -10,51 +9,11 @@ const logger = new LoggerContext({
class RepairTask extends BaseTask {
constructor(options) {
super({
enableMetrics: config.metrics.enabled,
metricsHost: config.metrics.host,
metricsPort: config.metrics.repairPort,
...options,
});
super(options);
this._defaultSchedule = config.repairSchedule;
this._defaultLag = repairLagSecs;
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const created = new promClient.Counter({
name: 's3_utapi_repair_task_created_total',
help: 'Total number of repair records created',
labelNames: ['origin', 'containerName'],
});
return {
created,
};
}
/**
* Metrics for RepairTask
* @typedef {Object} RepairMetrics
* @property {number} created - Number of repair records created
*/
/**
*
* @param {RepairMetrics} metrics - Metric values to push
* @returns {undefined}
*/
_pushMetrics(metrics) {
if (!this._enableMetrics) {
return;
}
if (metrics.created !== undefined) {
this._metricsHandlers.created.inc(metrics.created);
}
}
async _execute(timestamp) {
logger.debug('Checking for repairs', { timestamp, nodeId: this.nodeId });
@ -71,7 +30,6 @@ class RepairTask extends BaseTask {
});
if (status.result[0]) {
logger.info(`created ${status.result[0]} corrections`);
this._pushMetrics({ created: status.result[0] });
}
}
}

View File

@ -4,7 +4,6 @@ const timestamp = require('./timestamp');
const func = require('./func');
const disk = require('./disk');
const filter = require('./filter');
const probe = require('./probe');
module.exports = {
...log,
@ -13,5 +12,4 @@ module.exports = {
...func,
...disk,
...filter,
...probe,
};

View File

@ -1,6 +1,14 @@
const werelogs = require('werelogs');
const config = require('../config');
const { comprehend } = require('./func');
const loggerConfig = {
level: config.logging.level,
dump: config.logging.dumpLevel,
};
werelogs.configure(loggerConfig);
const rootLogger = new werelogs.Logger('Utapi');
class LoggerContext {
@ -70,6 +78,8 @@ class LoggerContext {
}
}
rootLogger.debug('logger initialized', { loggerConfig });
function buildRequestLogger(req) {
let reqUids = [];
if (req.headers['x-scal-request-uids'] !== undefined) {

View File

@ -1,32 +0,0 @@
const { ProbeServer } = require('arsenal').network.probe.ProbeServer;
/**
* Configure probe servers
* @typedef {Object} ProbeServerConfig
* @property {string} bindAddress - Address to bind probe server to
* @property {number} port - Port to bind probe server to
*/
/**
* Start an empty probe server
* @async
* @param {ProbeServerConfig} config - Configuration for probe server
* @returns {Promise<ProbeServer>} - Instance of ProbeServer
*/
async function startProbeServer(config) {
if (!config) {
throw new Error('configuration for probe server is missing');
}
return new Promise((resolve, reject) => {
const probeServer = new ProbeServer(config);
probeServer.onListening(() => resolve(probeServer));
probeServer.onError(err => reject(err));
probeServer.start();
});
}
module.exports = {
startProbeServer,
};

View File

@ -1,5 +1,6 @@
const assert = require('assert');
const { auth, policies } = require('arsenal');
const vaultclient = require('vaultclient');
const config = require('../config');
const errors = require('../errors');
/**
@ -8,17 +9,9 @@ const errors = require('../errors');
*/
class VaultWrapper extends auth.Vault {
create(config) {
if (config.vaultd.host) {
return new VaultWrapper(config);
}
return null;
}
constructor(options) {
let client;
const { host, port } = options.vaultd;
const vaultclient = require('vaultclient');
if (options.tls) {
const { key, cert, ca } = options.tls;
client = new vaultclient.Client(host, port, true, key, cert,
@ -76,7 +69,7 @@ class VaultWrapper extends auth.Vault {
request,
request.logger.logger,
(err, authInfo, authRes) => {
if (err && err.is && (err.is.InvalidAccessKeyId || err.is.AccessDenied)) {
if (err && (err.InvalidAccessKeyId || err.AccessDenied)) {
resolve({ authed: false });
return;
}
@ -126,7 +119,7 @@ class VaultWrapper extends auth.Vault {
}
}
const vault = VaultWrapper.create(config);
const vault = new VaultWrapper(config);
auth.setHandler(vault);
module.exports = {

View File

@ -73,11 +73,6 @@ async function authorizeUserAccessKey(authInfo, level, resources, log) {
log.trace('Authorizing IAM user', { resources });
// If no resources were authorized by Vault then no further checking is required
if (resources.length === 0) {
return [false, []];
}
// Get the parent account id from the user's arn
const parentAccountId = authInfo.getArn().split(':')[4];

View File

@ -3,7 +3,7 @@
"engines": {
"node": ">=16"
},
"version": "8.1.15",
"version": "7.10.6",
"description": "API for tracking resource utilization and reporting metrics",
"main": "index.js",
"repository": {
@ -19,12 +19,13 @@
"dependencies": {
"@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.14",
"arsenal": "git+https://git.yourcmc.ru/vitalif/zenko-arsenal.git#development/8.1",
"arsenal": "scality/Arsenal#7.10.9",
"async": "^3.2.0",
"aws-sdk": "^2.1005.0",
"aws4": "^1.8.0",
"backo": "^1.1.0",
"body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient#7.10.2",
"byte-size": "^7.0.0",
"commander": "^5.1.0",
"cron-parser": "^2.15.0",
@ -36,17 +37,18 @@
"level-mem": "^5.0.1",
"needle": "^2.5.0",
"node-schedule": "^1.3.2",
"oas-tools": "^2.2.2",
"prom-client": "14.2.0",
"oas-tools": "^2.1.8",
"prom-client": "^13.1.0",
"uuid": "^3.3.2",
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1"
"vaultclient": "scality/vaultclient#7.10.2",
"werelogs": "scality/werelogs#8.1.0"
},
"devDependencies": {
"eslint": "^8.14.0",
"eslint-config-airbnb-base": "^15.0.0",
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
"eslint": "6.0.1",
"eslint-config-airbnb": "17.1.0",
"eslint-config-scality": "scality/Guidelines#7.10.2",
"eslint-plugin-import": "^2.18.0",
"mocha": ">=3.1.2",
"mocha": "^3.0.2",
"nodemon": "^2.0.4",
"protobufjs": "^6.10.1",
"sinon": "^9.0.2"

View File

@ -1,2 +0,0 @@
redis==5.0.3
requests==2.31.0

View File

@ -3,16 +3,17 @@ const assert = require('assert');
const url = require('url');
const { auth, errors, policies } = require('arsenal');
const safeJsonParse = require('../utils/safeJsonParse');
const Vault = require('../lib/Vault');
class Router {
/**
* @constructor
* @param {Config} config - Config instance
*/
constructor(config, vault) {
constructor(config) {
this._service = config.component;
this._routes = {};
this._vault = vault;
this._vault = new Vault(config);
}
/**
@ -265,10 +266,6 @@ class Router {
*/
_processSecurityChecks(utapiRequest, route, cb) {
const log = utapiRequest.getLog();
if (process.env.UTAPI_AUTH === 'false') {
// Zenko route request does not need to go through Vault
return this._startRequest(utapiRequest, route, cb);
}
return this._authSquared(utapiRequest, err => {
if (err) {
log.trace('error from vault', { errors: err });

View File

@ -1,21 +1,4 @@
const fs = require('fs');
const path = require('path');
const Config = require('./lib/Config');
const config = require('./lib/Config');
const server = require('./lib/server');
/*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
const cfgpath = process.env.UTAPI_CONFIG_FILE || (__dirname+'/config.json');
let cfg;
try {
cfg = JSON.parse(fs.readFileSync(cfgpath, { encoding: 'utf-8' }));
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
cfg.component = 's3';
server(new Config(cfg));
server(Object.assign({}, config, { component: 's3' }));

View File

@ -1,14 +1,11 @@
const assert = require('assert');
const fs = require('fs');
const sinon = require('sinon');
const uuid = require('uuid');
const promClient = require('prom-client');
const { clients: warp10Clients } = require('../../../libV2/warp10');
const { MonitorDiskUsage } = require('../../../libV2/tasks');
const { fillDir } = require('../../utils/v2Data');
const { assertMetricValue } = require('../../utils/prom');
// eslint-disable-next-line func-names
describe('Test MonitorDiskUsage hard limit', function () {
@ -18,18 +15,14 @@ describe('Test MonitorDiskUsage hard limit', function () {
beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(`${path}/datalog`, { recursive: true });
promClient.register.clear();
task = new MonitorDiskUsage({ warp10: warp10Clients, enableMetrics: true });
task = new MonitorDiskUsage({ warp10: warp10Clients });
await task.setup();
task._path = path;
task._enabled = true;
});
afterEach(async () => task.join());
it('should trigger a database lock if above the limit', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 });
fillDir(path, { count: 1, size: 100 });
task._hardLimit = 1;
const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -41,11 +34,10 @@ describe('Test MonitorDiskUsage hard limit', function () {
assert(lockSpy.calledOnce);
assert(unlockSpy.notCalled);
assert(execStub.calledOnce);
await assertMetricValue('s3_utapi_monitor_disk_usage_hard_limit_bytes', 1);
});
it('should trigger a database unlock if below the limit', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 });
fillDir(path, { count: 1, size: 100 });
task._hardLimit = 10240;
const checkSpy = sinon.spy(task, '_checkHardLimit');
const lockSpy = sinon.spy(task, '_disableWarp10Updates');
@ -58,7 +50,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
});
it('should not throw when failing to calculate disk usage', async () => {
fillDir(`${path}/leveldb`, { count: 1, size: 100 });
fillDir(path, { count: 1, size: 100 });
task._hardLimit = 1;
sinon.stub(task, '_getUsage').throws();
const _task = task.execute();

View File

@ -1,83 +0,0 @@
const assert = require('assert');
const needle = require('needle');
const promClient = require('prom-client');
const sinon = require('sinon');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { BaseTask } = require('../../../../libV2/tasks');
const { clients: warp10Clients } = require('../../../../libV2/warp10');
const { getMetricValues } = require('../../../utils/prom');
const METRICS_SERVER_PORT = 10999;
class CustomTask extends BaseTask {
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
const foo = new promClient.Gauge({
name: 's3_utapi_custom_task_foo_total',
help: 'Count of foos',
labelNames: ['origin', 'containerName'],
});
return { foo };
}
async _execute() {
this._metricsHandlers.foo.inc(1);
}
}
describe('Test BaseTask metrics', () => {
let task;
beforeEach(async () => {
task = new CustomTask({
enableMetrics: true,
metricsPort: METRICS_SERVER_PORT,
warp10: [warp10Clients[0]],
});
await task.setup();
});
afterEach(async () => {
await task.join();
promClient.register.clear();
});
it('should start a metrics server on the provided port', async () => {
const res = await needle(
'get',
`http://localhost:${METRICS_SERVER_PORT}${DEFAULT_METRICS_ROUTE}`,
);
const lines = res.body.split('\n');
const first = lines[0];
assert.strictEqual(res.statusCode, 200);
assert(first.startsWith('# HELP'));
});
it('should push metrics for a task execution', async () => {
await task.execute();
const timeValues = await getMetricValues('s3_utapi_custom_task_duration_seconds');
assert.strictEqual(timeValues.length, 1);
const attemptsValues = await getMetricValues('s3_utapi_custom_task_attempts_total');
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, []);
});
it('should push metrics for a failed task execution', async () => {
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
await task.execute();
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
});
it('should allow custom handlers to be registered', async () => {
await task.execute();
const fooValues = await getMetricValues('s3_utapi_custom_task_foo_total');
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
});
});

View File

@ -1,5 +1,4 @@
const assert = require('assert');
const promClient = require('prom-client');
const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10');
@ -7,7 +6,6 @@ const { convertTimestamp } = require('../../../../libV2/utils');
const { CreateCheckpoint } = require('../../../../libV2/tasks');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
const _now = Math.floor(new Date().getTime() / 1000);
const getTs = delta => convertTimestamp(_now + delta);
@ -51,16 +49,10 @@ describe('Test CreateCheckpoint', function () {
prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix });
checkpointTask = new CreateCheckpoint({ warp10: [warp10], enableMetrics: true });
await checkpointTask.setup();
checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
checkpointTask._program = { lag: 0, nodeId: prefix };
});
afterEach(async () => {
await checkpointTask.join();
promClient.register.clear();
});
it('should create checkpoints from events', async () => {
const start = getTs(-300);
const stop = getTs(-120);
@ -80,7 +72,6 @@ describe('Test CreateCheckpoint', function () {
assert.strictEqual(series.length, 3);
assertResults(totals, series);
await assertMetricValue('s3_utapi_create_checkpoint_created_total', series.length);
});
it('should only include events not in an existing checkpoint', async () => {

View File

@ -1,5 +1,4 @@
const assert = require('assert');
const promClient = require('prom-client');
const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10');
@ -7,7 +6,6 @@ const { convertTimestamp } = require('../../../../libV2/utils');
const { CreateCheckpoint, CreateSnapshot, RepairTask } = require('../../../../libV2/tasks');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
const _now = Math.floor(new Date().getTime() / 1000);
const getTs = delta => convertTimestamp(_now + delta);
@ -56,19 +54,13 @@ describe('Test CreateSnapshot', function () {
checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
checkpointTask._program = { lag: 0, nodeId: prefix };
snapshotTask = new CreateSnapshot({ warp10: [warp10], enableMetrics: true });
await snapshotTask.setup();
snapshotTask = new CreateSnapshot({ warp10: [warp10] });
snapshotTask._program = { lag: 0, nodeId: prefix };
repairTask = new RepairTask({ warp10: [warp10] });
repairTask._program = { lag: 0, nodeId: prefix };
});
afterEach(async () => {
await snapshotTask.join();
promClient.register.clear();
});
it('should create a snapshot from a checkpoint', async () => {
const start = getTs(-300);
const stop = getTs(-120);
@ -88,7 +80,6 @@ describe('Test CreateSnapshot', function () {
assert.strictEqual(series.length, 3);
assertResults(totals, series);
await assertMetricValue('s3_utapi_create_snapshot_created_total', series.length);
});
it('should create a snapshot from more than one checkpoint', async () => {

View File

@ -1,19 +1,15 @@
const assert = require('assert');
const fs = require('fs');
const promClient = require('prom-client');
const uuid = require('uuid');
const { MonitorDiskUsage } = require('../../../../libV2/tasks');
const { getFolderSize } = require('../../../../libV2/utils');
const { fillDir } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
class MonitorDiskUsageShim extends MonitorDiskUsage {
async _getUsage(path) {
const usage = await super._getUsage(path);
this.usage = (this.usage || 0) + usage;
return usage;
async _getUsage() {
this.usage = await super._getUsage();
return this.usage;
}
}
@ -47,35 +43,17 @@ describe('Test MonitorDiskUsage', () => {
beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`;
fs.mkdirSync(path);
task = new MonitorDiskUsageShim({ warp10: [], enableMetrics: true });
task = new MonitorDiskUsageShim({ warp10: [] });
task._path = path;
task._enabled = true;
await task.setup();
});
afterEach(async () => {
await task.join();
promClient.register.clear();
});
testCases.map(testCase => {
testCases.map(testCase =>
it(`should calculate disk usage for ${testCase.count} files of ${testCase.size} bytes each`,
async () => {
fillDir(`${path}/leveldb`, testCase);
fillDir(`${path}/datalog`, testCase);
fillDir(path, testCase);
await task._execute();
const expectedSingleSize = emptyDirSize + testCase.expected + (emptyFileSize * testCase.count);
const expectedTotalSize = expectedSingleSize * 2;
assert.strictEqual(task.usage, expectedTotalSize);
// Should equal the usage minus the empty datalog
await assertMetricValue('s3_utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
await assertMetricValue('s3_utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
});
});
it('should fail if a subpath does not exist', () => {
assert.doesNotThrow(() => task._execute());
assert.strictEqual(task.usage, undefined);
});
assert.strictEqual(task.usage, testCase.expected + emptyDirSize + (emptyFileSize * testCase.count));
}));
});

View File

@ -1,5 +1,4 @@
const assert = require('assert');
const promClient = require('prom-client');
const uuid = require('uuid');
const { CacheClient, backends: cacheBackends } = require('../../../../libV2/cache');
@ -10,7 +9,6 @@ const config = require('../../../../libV2/config');
const { eventFieldsToWarp10 } = require('../../../../libV2/constants');
const { generateFakeEvents, fetchRecords } = require('../../../utils/v2Data');
const { assertMetricValue, getMetricValues } = require('../../../utils/prom');
const _now = Math.floor(new Date().getTime() / 1000);
const getTs = delta => convertTimestamp(_now + delta);
@ -60,21 +58,17 @@ describe('Test IngestShards', function () {
await cacheClient.connect();
warp10 = new Warp10Client({ nodeId: prefix });
ingestTask = new IngestShard({ warp10: [warp10], enableMetrics: true });
await ingestTask.setup();
ingestTask = new IngestShard({ warp10: [warp10] });
ingestTask._cache._cacheBackend._prefix = prefix;
ingestTask._program = { lag: 0 };
await ingestTask._cache.connect();
});
afterEach(async () => {
this.afterEach(async () => {
await cacheClient.disconnect();
await ingestTask._cache.disconnect();
await ingestTask.join();
promClient.register.clear();
});
it('should ingest metrics from a single shard', async () => {
const start = shardFromTimestamp(getTs(-120));
const stop = start + 9000000;
@ -92,12 +86,6 @@ describe('Test IngestShards', function () {
'@utapi/decodeEvent',
);
assertResults(events, series);
await assertMetricValue('s3_utapi_ingest_shard_task_ingest_total', events.length);
await assertMetricValue('s3_utapi_ingest_shard_task_shard_ingest_total', 1);
const metricValues = await getMetricValues('s3_utapi_ingest_shard_task_shard_age_total');
assert.strictEqual(metricValues.length, 1);
const [metric] = metricValues;
assert(metric.value > 0);
});
it('should ingest metrics for multiple shards', async () => {
@ -118,7 +106,6 @@ describe('Test IngestShards', function () {
);
assertResults(events, series);
await assertMetricValue('s3_utapi_ingest_shard_task_ingest_total', events.length);
});
it('should ingest old metrics as repair', async () => {
@ -138,7 +125,6 @@ describe('Test IngestShards', function () {
'@utapi/decodeEvent',
);
assertResults(events, series);
await assertMetricValue('s3_utapi_ingest_shard_task_slow_total', events.length);
});
it('should strip the event uuid during ingestion', async () => {
@ -170,6 +156,7 @@ describe('Test IngestShards', function () {
const results = await warp10.fetch({
className: 'utapi.event', labels: { node: prefix }, start: start + 1, stop: -2,
});
const series = JSON.parse(results.result[0])[0];
const timestamps = series.v.map(ev => ev[0]);
assert.deepStrictEqual([
@ -178,8 +165,7 @@ describe('Test IngestShards', function () {
], timestamps);
});
// please unskip this in https://scality.atlassian.net/browse/UTAPI-65
it.skip('should increment microseconds for several duplicate timestamps', async () => {
it('should increment microseconds for several duplicate timestamps', async () => {
const start = shardFromTimestamp(getTs(-120));
const events = generateFakeEvents(start, start + 5, 5)
.map(ev => { ev.timestamp = start; return ev; });
@ -190,6 +176,7 @@ describe('Test IngestShards', function () {
const results = await warp10.fetch({
className: 'utapi.event', labels: { node: prefix }, start: start + 5, stop: -5,
});
const series = JSON.parse(results.result[0])[0];
const timestamps = series.v.map(ev => ev[0]);
assert.deepStrictEqual([
@ -222,3 +209,4 @@ describe('Test IngestShards', function () {
], timestamps);
});
});

View File

@ -177,49 +177,4 @@ describe('Test ReindexTask', function () {
assert.strictEqual(series[0].values.length, 2);
series[0].values.map(value => assert.deepStrictEqual(value, bucketRecord));
});
describe('exponential backoff', () => {
it('should retry when bucketd is unreachable', done => {
// disable bucketd to simulate downtime
bucketd.end();
const bucketDStub = sinon.stub(bucketd, '_getBucketResponse');
bucketDStub.onFirstCall().callsFake(
// Once the timeout promise resolves, bucketd is able to be called.
// If we make a call after 10 seconds, this shows that retries
// have been occuring during bucketd downtime.
() => {
return {
key: 'foo',
value: 'bar',
};
},
);
const reindexPromise = new Promise((resolve, reject) => {
reindexTask._execute()
.then(() => {
resolve('reindexed');
})
.catch(err => {
reject(err);
});
});
const timeoutPromise = new Promise(resolve => {
const f = () => {
bucketd.start();
resolve();
};
setTimeout(f, 10000);
});
Promise.all([reindexPromise, timeoutPromise])
.then(values => {
assert.strictEqual(values[0], 'reindexed');
sinon.restore();
done();
});
});
});
});

View File

@ -1,5 +1,4 @@
const assert = require('assert');
const promClient = require('prom-client');
const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10');
@ -7,7 +6,6 @@ const { convertTimestamp } = require('../../../../libV2/utils');
const { RepairTask } = require('../../../../libV2/tasks');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
const { assertMetricValue } = require('../../../utils/prom');
// Ten minutes in the past
const _now = Math.floor(new Date().getTime() / 1000) - (600);
@ -51,16 +49,10 @@ describe('Test Repair', function () {
beforeEach(async () => {
prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix });
repairTask = new RepairTask({ warp10: [warp10], enableMetrics: true });
await repairTask.setup();
repairTask = new RepairTask({ warp10: [warp10] });
repairTask._program = { lag: 0, nodeId: prefix };
});
afterEach(async () => {
await repairTask.join();
promClient.register.clear();
});
it('should create corrections from events', async () => {
const start = getTs(-300);
const stop = getTs(-120);
@ -80,7 +72,6 @@ describe('Test Repair', function () {
assert.strictEqual(series.length, 3);
assertResults(totals, series);
await assertMetricValue('s3_utapi_repair_task_created_total', series.length);
});
it('should only include events not in an existing correction', async () => {

View File

@ -233,8 +233,7 @@ describe('Test UtapiClient', function () {
});
});
// please unskip this in https://scality.atlassian.net/browse/UTAPI-65
it.skip('should get the current storage for an account using the cache', async () => {
it('should get the current storage for an account using the cache', async () => {
await async.eachOf(totals.accounts, async (total, acc) => {
cacheClient.updateAccountCounterBase(acc, total.bytes);
});

View File

@ -1,5 +1,6 @@
/* eslint-disable implicit-arrow-linebreak */
const assert = require('assert');
const { errors } = require('arsenal');
const { Logger } = require('werelogs');
const MemoryBackend = require('../../lib/backend/Memory');
const Datastore = require('../../lib/Datastore');
@ -39,7 +40,7 @@ function getMetricResponse(schemaKey) {
return response;
}
function assertMetrics(schemaKey, metricName, props, done) {
function assertMetrics(schemaKey, metricName, props, isNegativeValue, done) {
const timestamp = new Date().setMinutes(0, 0, 0);
const timeRange = [timestamp, timestamp];
const expectedRes = getMetricResponse(schemaKey);
@ -51,6 +52,17 @@ function assertMetrics(schemaKey, metricName, props, done) {
datastore,
logger,
(err, res) => {
if (isNegativeValue) {
assert.deepStrictEqual(
err,
errors.InternalError.customizeDescription(
'Utapi is in a transient state for this time period as '
+ 'metrics are being collected. Please try again in a few '
+ 'minutes.',
),
);
return done();
}
assert.strictEqual(err, null);
// overwrite operations metrics
if (expectedResProps.operations) {
@ -88,12 +100,13 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
if (keyIndex === 'storageUtilized' || keyIndex === 'numberOfObjects') {
key = generateStateKey(schemaObject, keyIndex);
val = isNegativeValue ? -1024 : 1024;
props[metricindex] = isNegativeValue ? [0, 0] : [val, val];
props[metricindex] = [val, val];
memBackend.zadd(key, timestamp, val, () =>
assertMetrics(
schemaKey,
schemaObject[schemaKey],
props,
isNegativeValue,
done,
));
} else if (keyIndex === 'incomingBytes' || keyIndex === 'outgoingBytes') {
@ -105,6 +118,7 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
schemaKey,
schemaObject[schemaKey],
props,
isNegativeValue,
done,
));
} else {
@ -117,6 +131,7 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
schemaKey,
schemaObject[schemaKey],
props,
isNegativeValue,
done,
));
}
@ -132,6 +147,7 @@ Object.keys(metricLevels).forEach(schemaKey => {
schemaKey,
resourceNames[schemaKey],
null,
false,
done,
));

View File

@ -21,9 +21,6 @@ const config = {
localCache: redisLocal,
component: 's3',
};
const location = 'foo-backend';
const incrby = 100;
const decrby = -30;
function isSortedSetKey(key) {
return key.endsWith('storageUtilized') || key.endsWith('numberOfObjects');
@ -79,29 +76,6 @@ function setMockData(data, timestamp, cb) {
return cb();
}
function getLocationObject(bytesValue) {
const obj = {};
obj[`s3:location:${location}:locationStorage`] = `${bytesValue}`;
return obj;
}
function testLocationMetric(c, params, expected, cb) {
const { location, updateSize } = params;
if (updateSize) {
c.pushLocationMetric(location, updateSize, REQUID, err => {
assert.equal(err, null);
assert.deepStrictEqual(memoryBackend.data, expected);
return cb();
});
} else {
c.getLocationMetric(location, REQUID, (err, bytesStored) => {
assert.equal(err, null);
assert.strictEqual(bytesStored, expected);
return cb();
});
}
}
describe('UtapiClient:: enable/disable client', () => {
it('should disable client when no redis config is provided', () => {
const c = new UtapiClient();
@ -273,11 +247,7 @@ tests.forEach(test => {
c.setDataStore(ds);
c.pushMetric(metric, REQUID, params, () => {
deserializeMemoryBackend(memoryBackend.data);
Object.keys(expected).forEach(key => {
if (memoryBackend.data[key]) {
assert.deepStrictEqual(memoryBackend.data[key], expected[key]);
}
});
assert.deepStrictEqual(memoryBackend.data, expected);
return cb();
});
}
@ -520,7 +490,6 @@ tests.forEach(test => {
storageUtilized: '1024',
numberOfObjects: '1',
};
setMockData(data, timestamp, () => {
testMetric('deleteObject', params, expected, done);
});
@ -698,40 +667,6 @@ tests.forEach(test => {
testMetric('putDeleteMarkerObject', metricTypes, expected, done);
});
it('should push putDeleteMarkerObject metrics and have correct bytes and number of objects', done => {
const expected = buildExpectedResult({
action: 'PutObject',
numberOfObjects: '1',
});
const metrics = {
bucket: '5741-repro',
keys: ['foo2'],
byteLength: undefined,
newByteLength: 258,
oldByteLength: null,
numberOfObjects: 1,
accountId: '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
userId: undefined,
};
testMetric('putObject', Object.assign(metrics, metricTypes), expected, () => {
const expected = buildExpectedResult({
action: 'DeleteObject',
numberOfObjects: '1',
});
const metrics2 = {
bucket: '5741-repro',
keys: ['foo2'],
byteLength: 258,
newByteLength: undefined,
oldByteLength: undefined,
numberOfObjects: undefined,
accountId: '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
userId: undefined,
};
testMetric('putDeleteMarkerObject', Object.assign(metrics2, metricTypes), expected, done);
});
});
it('should push putBucketReplication metrics', done => {
const expected = buildExpectedResult({
action: 'PutBucketReplication',
@ -831,27 +766,3 @@ tests.forEach(test => {
});
});
});
describe('UtapiClient:: location quota metrics', () => {
beforeEach(function beFn() {
this.currentTest.c = new UtapiClient(config);
this.currentTest.c.setDataStore(ds);
});
afterEach(() => memoryBackend.flushDb());
it('should increment location metric', function itFn(done) {
const expected = getLocationObject(incrby);
testLocationMetric(this.test.c, { location, updateSize: incrby },
expected, done);
});
it('should decrement location metric', function itFn(done) {
const expected = getLocationObject(decrby);
testLocationMetric(this.test.c, { location, updateSize: decrby },
expected, done);
});
it('should list location metric', function itFn(done) {
const expected = 0;
testLocationMetric(this.test.c, { location }, expected, done);
});
});

View File

@ -10,7 +10,6 @@ const templateExpected = opts => ({
operationId: 'healthcheck',
tag: 'internal',
encrypted: false,
requestTimer: null,
...(opts || {}),
});

View File

@ -1,11 +1,8 @@
const assert = require('assert');
const sinon = require('sinon');
const promClient = require('prom-client');
const { middleware } = require('../../../../libV2/server/middleware');
const { templateRequest, ExpressResponseStub } = require('../../../utils/v2Data');
const RequestContext = require('../../../../libV2/models/RequestContext');
const { getMetricValues, assertMetricValue } = require('../../../utils/prom');
describe('Test middleware', () => {
it('should build a request logger', next => {
@ -89,56 +86,4 @@ describe('Test middleware', () => {
}));
});
});
describe('test httpMetricsMiddleware', () => {
let resp;
beforeEach(() => {
resp = new ExpressResponseStub();
resp.status(200);
});
afterEach(() => {
promClient.register.clear();
});
it('should increment the counter if not an internal route', async () => {
const req = templateRequest({
swagger: {
operation: {
'x-router-controller': 'metrics',
'operationId': 'listMetrics',
},
},
});
req.ctx = new RequestContext(req);
middleware.httpMetricsMiddleware(req, resp);
await assertMetricValue('s3_utapi_http_requests_total', 1);
const durationMetric = 's3_utapi_http_request_duration_seconds';
const duration = await getMetricValues(durationMetric);
// 14 defined buckets + 1 for Infinity
assert.strictEqual(
duration.filter(metric => metric.metricName === `${durationMetric}_bucket`).length,
15,
);
const count = duration.filter(metric => metric.metricName === `${durationMetric}_count`);
assert.deepStrictEqual(count, [{
labels: {
action: 'listMetrics',
code: 200,
},
metricName: `${durationMetric}_count`,
value: 1,
}]);
assert.strictEqual(count[0].value, 1);
});
it('should not increment the counter if an internal route', async () => {
const req = templateRequest();
req.ctx = new RequestContext(req);
middleware.httpMetricsMiddleware(req, resp);
assert.rejects(() => getMetricValues('s3_utapi_http_requests_total'));
});
});
});

View File

@ -112,17 +112,6 @@ class BucketD {
return body;
}
_getBucketVersionResponse(bucketName) {
const body = {
CommonPrefixes: [],
IsTruncated: false,
Versions: (this._bucketContent[bucketName] || [])
// patch in a versionId to more closely match the real response
.map(entry => ({ ...entry, versionId: 'null' })),
};
return body;
}
_getShadowBucketOverviewResponse(bucketName) {
const mpus = (this._bucketContent[bucketName] || []).map(o => ({
key: o.key,
@ -148,8 +137,6 @@ class BucketD {
|| req.query.listingType === 'Delimiter'
) {
req.body = this._getBucketResponse(bucketName);
} else if (req.query.listingType === 'DelimiterVersions') {
req.body = this._getBucketVersionResponse(bucketName);
}
// v2 reindex uses `Basic` listing type for everything

View File

@ -1,20 +0,0 @@
const promClient = require('prom-client');
const assert = require('assert');
async function getMetricValues(name) {
const metric = await promClient.register.getSingleMetric(name);
const data = await metric.get();
return data.values;
}
async function assertMetricValue(name, value) {
const values = await getMetricValues(name);
assert.strictEqual(values.length, 1);
const [metric] = values;
assert.strictEqual(metric.value, value);
}
module.exports = {
getMetricValues,
assertMetricValue,
};

View File

@ -115,16 +115,7 @@
%> FOREACH
%>
<%
// If no new events were found
// - drop the empty list
// - write a new master checkpoint
// - return 0
DROP
NEWGTS $master_checkpoint_class RENAME
$endTimestamp NaN NaN NaN 0 ADDVALUE
{ 'node' $nodeId } RELABEL
$write_token UPDATE
0 STOP
DROP 0 STOP
%> IFTE
0 'checkpoints' STORE

4789
yarn.lock Normal file

File diff suppressed because it is too large Load Diff