Compare commits
4 Commits
developmen
...
user/tmacr
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 13b00077b3 | |
Taylor McKinnon | e99719f0c7 | |
Taylor McKinnon | 553744d270 | |
Taylor McKinnon | af9d283210 |
|
@ -1,87 +0,0 @@
|
|||
# General support information
|
||||
|
||||
GitHub Issues are **reserved** for actionable bug reports (including
|
||||
documentation inaccuracies), and feature requests.
|
||||
**All questions** (regarding configuration, usecases, performance, community,
|
||||
events, setup and usage recommendations, among other things) should be asked on
|
||||
the **[Zenko Forum](http://forum.zenko.io/)**.
|
||||
|
||||
> Questions opened as GitHub issues will systematically be closed, and moved to
|
||||
> the [Zenko Forum](http://forum.zenko.io/).
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
## Avoiding duplicates
|
||||
|
||||
When reporting a new issue/requesting a feature, make sure that we do not have
|
||||
any duplicates already open:
|
||||
|
||||
- search the issue list for this repository (use the search bar, select
|
||||
"Issues" on the left pane after searching);
|
||||
- if there is a duplicate, please do not open your issue, and add a comment
|
||||
to the existing issue instead.
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
## Bug report information
|
||||
|
||||
(delete this section (everything between the lines) if you're not reporting a
|
||||
bug but requesting a feature)
|
||||
|
||||
### Description
|
||||
|
||||
Briefly describe the problem you are having in a few paragraphs.
|
||||
|
||||
### Steps to reproduce the issue
|
||||
|
||||
Please provide steps to reproduce, including full log output
|
||||
|
||||
### Actual result
|
||||
|
||||
Describe the results you received
|
||||
|
||||
### Expected result
|
||||
|
||||
Describe the results you expected
|
||||
|
||||
### Additional information
|
||||
|
||||
- Node.js version,
|
||||
- Docker version,
|
||||
- npm version,
|
||||
- distribution/OS,
|
||||
- optional: anything else you deem helpful to us.
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
## Feature Request
|
||||
|
||||
(delete this section (everything between the lines) if you're not requesting
|
||||
a feature but reporting a bug)
|
||||
|
||||
### Proposal
|
||||
|
||||
Describe the feature
|
||||
|
||||
### Current behavior
|
||||
|
||||
What currently happens
|
||||
|
||||
### Desired behavior
|
||||
|
||||
What you would like to happen
|
||||
|
||||
### Usecase
|
||||
|
||||
Please provide usecases for changing the current behavior
|
||||
|
||||
### Additional information
|
||||
|
||||
- Is this request for your company? Y/N
|
||||
- If Y: Company name:
|
||||
- Are you using any Scality Enterprise Edition products (RING, Zenko EE)? Y/N
|
||||
- Are you willing to contribute this feature yourself?
|
||||
- Position/Title:
|
||||
- How did you hear about us?
|
||||
|
||||
--------------------------------------------------------------------------------
|
|
@ -1,7 +1,8 @@
|
|||
FROM ghcr.io/scality/vault:c2607856
|
||||
FROM registry.scality.com/vault-dev/vault:c2607856
|
||||
|
||||
ENV VAULT_DB_BACKEND LEVELDB
|
||||
|
||||
RUN chmod 400 tests/utils/keyfile
|
||||
|
||||
ENTRYPOINT yarn start
|
||||
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
name: build-ci-images
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
|
||||
jobs:
|
||||
warp10-ci:
|
||||
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
|
||||
secrets:
|
||||
REGISTRY_LOGIN: ${{ github.repository_owner }}
|
||||
REGISTRY_PASSWORD: ${{ github.token }}
|
||||
with:
|
||||
name: warp10-ci
|
||||
context: .
|
||||
file: images/warp10/Dockerfile
|
||||
lfs: true
|
||||
|
||||
redis-ci:
|
||||
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
|
||||
secrets:
|
||||
REGISTRY_LOGIN: ${{ github.repository_owner }}
|
||||
REGISTRY_PASSWORD: ${{ github.token }}
|
||||
with:
|
||||
name: redis-ci
|
||||
context: .
|
||||
file: images/redis/Dockerfile
|
||||
|
||||
redis-replica-ci:
|
||||
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
|
||||
needs:
|
||||
- redis-ci
|
||||
secrets:
|
||||
REGISTRY_LOGIN: ${{ github.repository_owner }}
|
||||
REGISTRY_PASSWORD: ${{ github.token }}
|
||||
with:
|
||||
name: redis-replica-ci
|
||||
context: .github/docker/redis-replica
|
||||
build-args: |
|
||||
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
|
||||
|
||||
vault-ci:
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
lfs: true
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v3
|
||||
- name: Login to GitHub Registry
|
||||
uses: docker/login-action@v3
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ github.token }}
|
||||
|
||||
- name: Build and push vault Image
|
||||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
push: true
|
||||
context: .github/docker/vault
|
||||
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
|
||||
cache-from: type=gha,scope=vault
|
||||
cache-to: type=gha,mode=max,scope=vault
|
|
@ -1,16 +0,0 @@
|
|||
name: build-dev-image
|
||||
|
||||
on:
|
||||
push:
|
||||
branches-ignore:
|
||||
- 'development/**'
|
||||
|
||||
jobs:
|
||||
build-dev:
|
||||
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
|
||||
secrets:
|
||||
REGISTRY_LOGIN: ${{ github.repository_owner }}
|
||||
REGISTRY_PASSWORD: ${{ github.token }}
|
||||
with:
|
||||
namespace: ${{ github.repository_owner }}
|
||||
name: ${{ github.event.repository.name }}
|
|
@ -1,39 +0,0 @@
|
|||
name: release-warp10
|
||||
|
||||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
tag:
|
||||
type: string
|
||||
description: 'Tag to be released'
|
||||
required: true
|
||||
create-github-release:
|
||||
type: boolean
|
||||
description: Create a tag and matching Github release.
|
||||
required: false
|
||||
default: true
|
||||
|
||||
jobs:
|
||||
build:
|
||||
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
|
||||
secrets: inherit
|
||||
with:
|
||||
name: warp10
|
||||
context: .
|
||||
file: images/warp10/Dockerfile
|
||||
tag: ${{ github.event.inputs.tag }}
|
||||
lfs: true
|
||||
|
||||
release:
|
||||
if: ${{ inputs.create-github-release }}
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
steps:
|
||||
- uses: softprops/action-gh-release@v2
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
with:
|
||||
name: Release utapi/warp10:${{ github.event.inputs.tag }}-warp10
|
||||
tag_name: ${{ github.event.inputs.tag }}-warp10
|
||||
generate_release_notes: false
|
||||
target_commitish: ${{ github.sha }}
|
|
@ -3,43 +3,44 @@ name: release
|
|||
on:
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
dockerfile:
|
||||
description: Dockerfile to build image from
|
||||
type: choice
|
||||
options:
|
||||
- images/nodesvc-base/Dockerfile
|
||||
- Dockerfile
|
||||
required: true
|
||||
tag:
|
||||
type: string
|
||||
description: 'Tag to be released'
|
||||
required: true
|
||||
create-github-release:
|
||||
type: boolean
|
||||
description: Create a tag and matching Github release.
|
||||
required: false
|
||||
default: false
|
||||
|
||||
jobs:
|
||||
build:
|
||||
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
|
||||
with:
|
||||
namespace: ${{ github.repository_owner }}
|
||||
name: ${{ github.event.repository.name }}
|
||||
context: .
|
||||
file: ${{ github.event.inputs.dockerfile}}
|
||||
tag: ${{ github.event.inputs.tag }}
|
||||
|
||||
release:
|
||||
if: ${{ inputs.create-github-release }}
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
steps:
|
||||
- uses: softprops/action-gh-release@v2
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2
|
||||
|
||||
- name: Set up QEMU
|
||||
uses: docker/setup-qemu-action@v1
|
||||
|
||||
- name: Set up Docker Buildk
|
||||
uses: docker/setup-buildx-action@v1
|
||||
|
||||
- name: Login to Registry
|
||||
uses: docker/login-action@v1
|
||||
with:
|
||||
registry: registry.scality.com
|
||||
username: ${{ secrets.REGISTRY_LOGIN }}
|
||||
password: ${{ secrets.REGISTRY_PASSWORD }}
|
||||
|
||||
- name: Build and push utapi image
|
||||
uses: docker/build-push-action@v2
|
||||
with:
|
||||
context: .
|
||||
push: true
|
||||
tags: "registry.scality.com/utapi/utapi:${{ github.event.inputs.tag }}"
|
||||
|
||||
- name: Create Release
|
||||
uses: softprops/action-gh-release@v1
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ github.token }}
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
with:
|
||||
name: Release ${{ github.event.inputs.tag }}
|
||||
tag_name: ${{ github.event.inputs.tag }}
|
||||
generate_release_notes: true
|
||||
target_commitish: ${{ github.sha }}
|
||||
|
||||
|
|
|
@ -4,32 +4,83 @@ name: tests
|
|||
on:
|
||||
push:
|
||||
branches-ignore:
|
||||
- 'development/**'
|
||||
- 'development/**'
|
||||
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
debug:
|
||||
description: Debug (enable the ability to SSH to runners)
|
||||
type: boolean
|
||||
required: false
|
||||
default: 'false'
|
||||
connection-timeout-m:
|
||||
type: number
|
||||
required: false
|
||||
description: Timeout for ssh connection to worker (minutes)
|
||||
default: 30
|
||||
jobs:
|
||||
build-ci:
|
||||
uses: ./.github/workflows/build-ci.yaml
|
||||
build:
|
||||
runs-on: ubuntu-20.04
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v2.3.4
|
||||
with:
|
||||
lfs: true
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v1.6.0
|
||||
|
||||
- name: Login to GitHub Registry
|
||||
uses: docker/login-action@v1.10.0
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: ${{ github.repository_owner }}
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Login to Scality Registry
|
||||
uses: docker/login-action@v1.10.0
|
||||
with:
|
||||
registry: registry.scality.com
|
||||
username: ${{ secrets.REGISTRY_LOGIN }}
|
||||
password: ${{ secrets.REGISTRY_PASSWORD }}
|
||||
|
||||
- name: Build and push redis CI image
|
||||
uses: docker/build-push-action@v2.7.0
|
||||
with:
|
||||
push: true
|
||||
file: images/redis/Dockerfile
|
||||
context: '.'
|
||||
tags: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
|
||||
cache-from: type=gha,scope=redis
|
||||
cache-to: type=gha,mode=max,scope=redis
|
||||
|
||||
- name: Build and push redis replica CI image
|
||||
uses: docker/build-push-action@v2.7.0
|
||||
with:
|
||||
push: true
|
||||
context: .github/docker/redis-replica
|
||||
build-args: |
|
||||
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
|
||||
tags: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
|
||||
cache-from: type=gha,scope=redis-replica
|
||||
cache-to: type=gha,mode=max,scope=redis-replica
|
||||
|
||||
- name: Build and push warp10 Image
|
||||
uses: docker/build-push-action@v2.7.0
|
||||
with:
|
||||
push: true
|
||||
file: images/warp10/Dockerfile
|
||||
context: '.'
|
||||
tags: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
|
||||
cache-from: type=gha,scope=warp10
|
||||
cache-to: type=gha,mode=max,scope=warp10
|
||||
|
||||
- name: Build and push vault Image
|
||||
uses: docker/build-push-action@v2.7.0
|
||||
with:
|
||||
push: true
|
||||
context: '.github/docker/vault'
|
||||
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
|
||||
cache-from: type=gha,scope=vault
|
||||
cache-to: type=gha,mode=max,scope=vault
|
||||
|
||||
|
||||
lint:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
lfs: true
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/setup-node@v2
|
||||
with:
|
||||
node-version: '16.13.2'
|
||||
cache: yarn
|
||||
|
@ -40,9 +91,8 @@ jobs:
|
|||
- name: run static analysis tools on code
|
||||
run: yarn run lint
|
||||
|
||||
tests-v1:
|
||||
needs:
|
||||
- build-ci
|
||||
tests:
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
REINDEX_PYTHON_INTERPRETER: python3
|
||||
|
@ -55,16 +105,16 @@ jobs:
|
|||
command: yarn test
|
||||
env:
|
||||
UTAPI_METRICS_ENABLED: 'true'
|
||||
- name: run v1 client tests
|
||||
- name: run client tests
|
||||
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client
|
||||
env: {}
|
||||
- name: run v1 server tests
|
||||
- name: run server tests
|
||||
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:server
|
||||
env: {}
|
||||
- name: run v1 cron tests
|
||||
- name: run cron tests
|
||||
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:cron
|
||||
env: {}
|
||||
- name: run v1 interval tests
|
||||
- name: run interval tests
|
||||
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:interval
|
||||
env: {}
|
||||
services:
|
||||
|
@ -88,7 +138,7 @@ jobs:
|
|||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
redis-sentinel:
|
||||
image: bitnami/redis-sentinel:7.2.4
|
||||
image: bitnami/redis-sentinel:6.2
|
||||
env:
|
||||
REDIS_MASTER_SET: scality-s3
|
||||
REDIS_SENTINEL_PORT_NUMBER: '16379'
|
||||
|
@ -119,133 +169,32 @@ jobs:
|
|||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
lfs: true
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/setup-node@v2
|
||||
with:
|
||||
node-version: '16.13.2'
|
||||
cache: yarn
|
||||
- uses: actions/setup-python@v5
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '3.9'
|
||||
cache: pip
|
||||
- uses: actions/cache@v2
|
||||
with:
|
||||
path: ~/.cache/pip
|
||||
key: ${{ runner.os }}-pip
|
||||
- name: Install python deps
|
||||
run: pip install -r requirements.txt
|
||||
run: |
|
||||
pip install requests
|
||||
pip install redis
|
||||
- name: install dependencies
|
||||
run: yarn install --frozen-lockfile --network-concurrency 1
|
||||
- name: ${{ matrix.test.name }}
|
||||
run: ${{ matrix.test.command }}
|
||||
env: ${{ matrix.test.env }}
|
||||
|
||||
tests-v2-with-vault:
|
||||
needs:
|
||||
- build-ci
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
REINDEX_PYTHON_INTERPRETER: python3
|
||||
services:
|
||||
redis:
|
||||
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
|
||||
ports:
|
||||
- 6379:6379
|
||||
- 9121:9121
|
||||
options: >-
|
||||
--health-cmd "redis-cli ping"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
redis-replica:
|
||||
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
|
||||
ports:
|
||||
- 6380:6380
|
||||
options: >-
|
||||
--health-cmd "redis-cli -p 6380 ping"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
redis-sentinel:
|
||||
image: bitnami/redis-sentinel:7.2.4
|
||||
env:
|
||||
REDIS_MASTER_SET: scality-s3
|
||||
REDIS_SENTINEL_PORT_NUMBER: '16379'
|
||||
REDIS_SENTINEL_QUORUM: '1'
|
||||
ports:
|
||||
- 16379:16379
|
||||
options: >-
|
||||
--health-cmd "redis-cli -p 16379 ping"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
warp10:
|
||||
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
|
||||
env:
|
||||
standalone.port: '4802'
|
||||
warpscript.maxops: '10000000'
|
||||
ENABLE_SENSISION: 't'
|
||||
ports:
|
||||
- 4802:4802
|
||||
- 8082:8082
|
||||
- 9718:9718
|
||||
options: >-
|
||||
--health-cmd "curl localhost:4802/api/v0/check"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 10
|
||||
--health-start-period 60s
|
||||
vault:
|
||||
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
|
||||
ports:
|
||||
- 8500:8500
|
||||
- 8600:8600
|
||||
- 8700:8700
|
||||
- 8800:8800
|
||||
options: >-
|
||||
--health-cmd "curl http://localhost:8500/_/healthcheck"
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 10
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
lfs: true
|
||||
- uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: '16.13.2'
|
||||
cache: yarn
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.9'
|
||||
cache: pip
|
||||
- name: Install python deps
|
||||
run: pip install -r requirements.txt
|
||||
- name: install dependencies
|
||||
run: yarn install --frozen-lockfile --network-concurrency 1
|
||||
- name: Wait for warp10 for 60 seconds
|
||||
run: sleep 60
|
||||
- name: run v2 functional tests
|
||||
run: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
|
||||
env:
|
||||
UTAPI_CACHE_BACKEND: redis
|
||||
UTAPI_SERVICE_USER_ENABLED: 'true'
|
||||
UTAPI_LOG_LEVEL: trace
|
||||
SETUP_CMD: "run start_v2:server"
|
||||
- name: 'Debug: SSH to runner'
|
||||
uses: scality/actions/action-ssh-to-runner@1.7.0
|
||||
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
|
||||
continue-on-error: true
|
||||
with:
|
||||
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
|
||||
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
|
||||
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
|
||||
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
|
||||
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
|
||||
|
||||
tests-v2-without-sensision:
|
||||
needs:
|
||||
- build-ci
|
||||
tests-with-vault:
|
||||
needs: build
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
REINDEX_PYTHON_INTERPRETER: python3
|
||||
|
@ -254,6 +203,13 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
test:
|
||||
- name: run v2 functional tests
|
||||
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
|
||||
env:
|
||||
UTAPI_CACHE_BACKEND: redis
|
||||
UTAPI_SERVICE_USER_ENABLED: 'true'
|
||||
UTAPI_LOG_LEVEL: trace
|
||||
SETUP_CMD: "run start_v2:server"
|
||||
- name: run v2 soft limit test
|
||||
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:softLimit
|
||||
env:
|
||||
|
@ -287,7 +243,7 @@ jobs:
|
|||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
redis-sentinel:
|
||||
image: bitnami/redis-sentinel:7.2.4
|
||||
image: bitnami/redis-sentinel:6.2
|
||||
env:
|
||||
REDIS_MASTER_SET: scality-s3
|
||||
REDIS_SENTINEL_PORT_NUMBER: '16379'
|
||||
|
@ -304,6 +260,7 @@ jobs:
|
|||
env:
|
||||
standalone.port: '4802'
|
||||
warpscript.maxops: '10000000'
|
||||
ENABLE_SENSISION: 't'
|
||||
ports:
|
||||
- 4802:4802
|
||||
- 8082:8082
|
||||
|
@ -329,19 +286,24 @@ jobs:
|
|||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
lfs: true
|
||||
- uses: actions/setup-node@v4
|
||||
- uses: actions/setup-node@v2
|
||||
with:
|
||||
node-version: '16.13.2'
|
||||
cache: yarn
|
||||
- uses: actions/setup-python@v5
|
||||
- uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: '3.9'
|
||||
cache: pip
|
||||
- uses: actions/cache@v2
|
||||
with:
|
||||
path: ~/.cache/pip
|
||||
key: ${{ runner.os }}-pip
|
||||
- name: Install python deps
|
||||
run: pip install -r requirements.txt
|
||||
run: |
|
||||
pip install requests
|
||||
pip install redis
|
||||
- name: install dependencies
|
||||
run: yarn install --frozen-lockfile --network-concurrency 1
|
||||
- name: Wait for warp10 a little bit
|
||||
|
@ -349,13 +311,6 @@ jobs:
|
|||
- name: ${{ matrix.test.name }}
|
||||
run: ${{ matrix.test.command }}
|
||||
env: ${{ matrix.test.env }}
|
||||
- name: 'Debug: SSH to runner'
|
||||
uses: scality/actions/action-ssh-to-runner@1.7.0
|
||||
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
|
||||
continue-on-error: true
|
||||
with:
|
||||
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
|
||||
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
|
||||
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
|
||||
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
|
||||
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
|
||||
- name: Setup tmate session
|
||||
uses: mxschmitt/action-tmate@v3
|
||||
if: failure()
|
||||
|
|
31
Dockerfile
31
Dockerfile
|
@ -1,31 +0,0 @@
|
|||
FROM node:16.13.2-buster-slim
|
||||
|
||||
WORKDIR /usr/src/app
|
||||
|
||||
COPY package.json yarn.lock /usr/src/app/
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y \
|
||||
curl \
|
||||
gnupg2
|
||||
|
||||
RUN curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
|
||||
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y jq git python3 build-essential yarn --no-install-recommends \
|
||||
&& yarn cache clean \
|
||||
&& yarn install --frozen-lockfile --production --ignore-optional --network-concurrency=1 \
|
||||
&& apt-get autoremove --purge -y python3 git build-essential \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& yarn cache clean \
|
||||
&& rm -rf ~/.node-gyp \
|
||||
&& rm -rf /tmp/yarn-*
|
||||
|
||||
# Keep the .git directory in order to properly report version
|
||||
COPY . /usr/src/app
|
||||
|
||||
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
|
||||
CMD [ "yarn", "start" ]
|
||||
|
||||
EXPOSE 8100
|
37
README.md
37
README.md
|
@ -3,8 +3,9 @@
|
|||
![Utapi logo](res/utapi-logo.png)
|
||||
|
||||
[![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi)
|
||||
[![Scality CI][badgepriv]](http://ci.ironmann.io/gh/scality/utapi)
|
||||
|
||||
Service Utilization API for tracking resource usage and metrics reporting.
|
||||
Service Utilization API for tracking resource usage and metrics reporting
|
||||
|
||||
## Design
|
||||
|
||||
|
@ -87,13 +88,13 @@ Server is running.
|
|||
1. Create an IAM user
|
||||
|
||||
```
|
||||
aws iam --endpoint-url <endpoint> create-user --user-name <user-name>
|
||||
aws iam --endpoint-url <endpoint> create-user --user-name utapiuser
|
||||
```
|
||||
|
||||
2. Create access key for the user
|
||||
|
||||
```
|
||||
aws iam --endpoint-url <endpoint> create-access-key --user-name <user-name>
|
||||
aws iam --endpoint-url <endpoint> create-access-key --user-name utapiuser
|
||||
```
|
||||
|
||||
3. Define a managed IAM policy
|
||||
|
@ -202,11 +203,12 @@ Server is running.
|
|||
5. Attach user to the managed policy
|
||||
|
||||
```
|
||||
aws --endpoint-url <endpoint> iam attach-user-policy --user-name
|
||||
<user-name> --policy-arn <policy arn>
|
||||
aws --endpoint-url <endpoint> iam attach-user-policy --user-name utapiuser
|
||||
--policy-arn <policy arn>
|
||||
```
|
||||
|
||||
Now the user has access to ListMetrics request in Utapi on all buckets.
|
||||
Now the user `utapiuser` has access to ListMetrics request in Utapi on all
|
||||
buckets.
|
||||
|
||||
### Signing request with Auth V4
|
||||
|
||||
|
@ -222,18 +224,16 @@ following urls for reference.
|
|||
You may also view examples making a request with Auth V4 using various languages
|
||||
and AWS SDKs [here](/examples).
|
||||
|
||||
Alternatively, you can use a nifty command line tool available in Scality's
|
||||
CloudServer.
|
||||
Alternatively, you can use a nifty command line tool available in Scality's S3.
|
||||
|
||||
You can git clone the CloudServer repo from here
|
||||
https://github.com/scality/cloudserver and follow the instructions in the README
|
||||
to install the dependencies.
|
||||
You can git clone S3 repo from here https://github.com/scality/S3.git and follow
|
||||
the instructions in README to install the dependencies.
|
||||
|
||||
If you have CloudServer running inside a docker container you can docker exec
|
||||
into the CloudServer container as
|
||||
If you have S3 running inside a docker container you can docker exec into the S3
|
||||
container as
|
||||
|
||||
```
|
||||
docker exec -it <container-id> bash
|
||||
docker exec -it <container id> bash
|
||||
```
|
||||
|
||||
and then run the command
|
||||
|
@ -271,7 +271,7 @@ Usage: list_metrics [options]
|
|||
-v, --verbose
|
||||
```
|
||||
|
||||
An example call to list metrics for a bucket `demo` to Utapi in a https enabled
|
||||
A typical call to list metrics for a bucket `demo` to Utapi in a https enabled
|
||||
deployment would be
|
||||
|
||||
```
|
||||
|
@ -283,7 +283,7 @@ Both start and end times are time expressed as UNIX epoch timestamps **expressed
|
|||
in milliseconds**.
|
||||
|
||||
Keep in mind, since Utapi metrics are normalized to the nearest 15 min.
|
||||
interval, start time and end time need to be in the specific format as follows.
|
||||
interval, so start time and end time need to be in specific format as follows.
|
||||
|
||||
#### Start time
|
||||
|
||||
|
@ -297,7 +297,7 @@ Date: Tue Oct 11 2016 17:35:25 GMT-0700 (PDT)
|
|||
|
||||
Unix timestamp (milliseconds): 1476232525320
|
||||
|
||||
Here's an example JS method to get a start timestamp
|
||||
Here's a typical JS method to get start timestamp
|
||||
|
||||
```javascript
|
||||
function getStartTimestamp(t) {
|
||||
|
@ -317,7 +317,7 @@ seconds and milliseconds set to 59 and 999 respectively. So valid end timestamps
|
|||
would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and
|
||||
`09:59:59:999`.
|
||||
|
||||
Here's an example JS method to get an end timestamp
|
||||
Here's a typical JS method to get end timestamp
|
||||
|
||||
```javascript
|
||||
function getEndTimestamp(t) {
|
||||
|
@ -342,3 +342,4 @@ In order to contribute, please follow the
|
|||
https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md).
|
||||
|
||||
[badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg
|
||||
[badgepriv]: http://ci.ironmann.io/gh/scality/utapi.svg?style=svg
|
||||
|
|
|
@ -27,7 +27,7 @@ x-models:
|
|||
|
||||
services:
|
||||
redis-0:
|
||||
image: redis:7.2.4
|
||||
image: redis:6
|
||||
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||
ports:
|
||||
- 6379:6379
|
||||
|
@ -35,7 +35,7 @@ services:
|
|||
- HOST_IP="${EXTERNAL_HOST}"
|
||||
|
||||
redis-1:
|
||||
image: redis:7.2.4
|
||||
image: redis:6
|
||||
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
|
||||
ports:
|
||||
- 6380:6380
|
||||
|
@ -43,7 +43,7 @@ services:
|
|||
- HOST_IP="${EXTERNAL_HOST}"
|
||||
|
||||
redis-sentinel-0:
|
||||
image: redis:7.2.4
|
||||
image: redis:6
|
||||
command: |-
|
||||
bash -c 'cat > /tmp/sentinel.conf <<EOF
|
||||
port 16379
|
||||
|
|
|
@ -1,47 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
# set -e stops the execution of a script if a command or pipeline has an error
|
||||
set -e
|
||||
|
||||
# modifying config.json
|
||||
JQ_FILTERS_CONFIG="."
|
||||
|
||||
if [[ "$LOG_LEVEL" ]]; then
|
||||
if [[ "$LOG_LEVEL" == "info" || "$LOG_LEVEL" == "debug" || "$LOG_LEVEL" == "trace" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .log.logLevel=\"$LOG_LEVEL\""
|
||||
echo "Log level has been modified to $LOG_LEVEL"
|
||||
else
|
||||
echo "The log level you provided is incorrect (info/debug/trace)"
|
||||
fi
|
||||
fi
|
||||
|
||||
if [[ "$WORKERS" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .workers=\"$WORKERS\""
|
||||
fi
|
||||
|
||||
if [[ "$REDIS_HOST" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.host=\"$REDIS_HOST\""
|
||||
fi
|
||||
|
||||
if [[ "$REDIS_PORT" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.port=\"$REDIS_PORT\""
|
||||
fi
|
||||
|
||||
if [[ "$VAULTD_HOST" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.host=\"$VAULTD_HOST\""
|
||||
fi
|
||||
|
||||
if [[ "$VAULTD_PORT" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.port=\"$VAULTD_PORT\""
|
||||
fi
|
||||
|
||||
if [[ "$HEALTHCHECKS_ALLOWFROM" ]]; then
|
||||
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .healthChecks.allowFrom=[\"$HEALTHCHECKS_ALLOWFROM\"]"
|
||||
fi
|
||||
|
||||
if [[ $JQ_FILTERS_CONFIG != "." ]]; then
|
||||
jq "$JQ_FILTERS_CONFIG" config.json > config.json.tmp
|
||||
mv config.json.tmp config.json
|
||||
fi
|
||||
|
||||
exec "$@"
|
|
@ -1,42 +0,0 @@
|
|||
# Utapi Release Plan
|
||||
|
||||
## Docker Image Generation
|
||||
|
||||
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
|
||||
Utapi has one namespace there:
|
||||
|
||||
* Namespace: ghcr.io/scality/utapi
|
||||
|
||||
With every CI build, the CI will push images, tagging the
|
||||
content with the developer branch's short SHA-1 commit hash.
|
||||
This allows those images to be used by developers, CI builds,
|
||||
build chain and so on.
|
||||
|
||||
Tagged versions of utapi will be stored in the production namespace.
|
||||
|
||||
## How to Pull Docker Images
|
||||
|
||||
```sh
|
||||
docker pull ghcr.io/scality/utapi:<commit hash>
|
||||
docker pull ghcr.io/scality/utapi:<tag>
|
||||
```
|
||||
|
||||
## Release Process
|
||||
|
||||
To release a production image:
|
||||
|
||||
* Name the tag for the repository and Docker image.
|
||||
|
||||
* Use the `yarn version` command with the same tag to update `package.json`.
|
||||
|
||||
* Create a PR and merge the `package.json` change.
|
||||
|
||||
* Tag the repository using the same tag.
|
||||
|
||||
* [Force a build] using:
|
||||
* A given branch that ideally matches the tag.
|
||||
* The `release` stage.
|
||||
* An extra property with the name `tag` and its value being the actual tag.
|
||||
|
||||
[Force a build]:
|
||||
https://eve.devsca.com/github/scality/utapi/#/builders/bootstrap/force/force
|
|
@ -1,90 +0,0 @@
|
|||
import sys, os, base64, datetime, hashlib, hmac, datetime, calendar, json
|
||||
import requests # pip install requests
|
||||
|
||||
access_key = '9EQTVVVCLSSG6QBMNKO5'
|
||||
secret_key = 'T5mK/skkkwJ/mTjXZnHyZ5UzgGIN=k9nl4dyTmDH'
|
||||
|
||||
method = 'POST'
|
||||
service = 's3'
|
||||
host = 'localhost:8100'
|
||||
region = 'us-east-1'
|
||||
canonical_uri = '/buckets'
|
||||
canonical_querystring = 'Action=ListMetrics&Version=20160815'
|
||||
content_type = 'application/x-amz-json-1.0'
|
||||
algorithm = 'AWS4-HMAC-SHA256'
|
||||
|
||||
t = datetime.datetime.utcnow()
|
||||
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
|
||||
date_stamp = t.strftime('%Y%m%d')
|
||||
|
||||
# Key derivation functions. See:
|
||||
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
|
||||
def sign(key, msg):
|
||||
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
|
||||
|
||||
def getSignatureKey(key, date_stamp, regionName, serviceName):
|
||||
kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
|
||||
kRegion = sign(kDate, regionName)
|
||||
kService = sign(kRegion, serviceName)
|
||||
kSigning = sign(kService, 'aws4_request')
|
||||
return kSigning
|
||||
|
||||
def get_start_time(t):
|
||||
start = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
|
||||
return calendar.timegm(start.utctimetuple()) * 1000;
|
||||
|
||||
def get_end_time(t):
|
||||
end = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
|
||||
return calendar.timegm(end.utctimetuple()) * 1000 - 1;
|
||||
|
||||
start_time = get_start_time(datetime.datetime(2016, 1, 1, 0, 0, 0, 0))
|
||||
end_time = get_end_time(datetime.datetime(2016, 2, 1, 0, 0, 0, 0))
|
||||
|
||||
# Request parameters for listing Utapi bucket metrics--passed in a JSON block.
|
||||
bucketListing = {
|
||||
'buckets': [ 'utapi-test' ],
|
||||
'timeRange': [ start_time, end_time ],
|
||||
}
|
||||
|
||||
request_parameters = json.dumps(bucketListing)
|
||||
|
||||
payload_hash = hashlib.sha256(request_parameters).hexdigest()
|
||||
|
||||
canonical_headers = \
|
||||
'content-type:{0}\nhost:{1}\nx-amz-content-sha256:{2}\nx-amz-date:{3}\n' \
|
||||
.format(content_type, host, payload_hash, amz_date)
|
||||
|
||||
signed_headers = 'content-type;host;x-amz-content-sha256;x-amz-date'
|
||||
|
||||
canonical_request = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}' \
|
||||
.format(method, canonical_uri, canonical_querystring, canonical_headers,
|
||||
signed_headers, payload_hash)
|
||||
|
||||
credential_scope = '{0}/{1}/{2}/aws4_request' \
|
||||
.format(date_stamp, region, service)
|
||||
|
||||
string_to_sign = '{0}\n{1}\n{2}\n{3}' \
|
||||
.format(algorithm, amz_date, credential_scope,
|
||||
hashlib.sha256(canonical_request).hexdigest())
|
||||
|
||||
signing_key = getSignatureKey(secret_key, date_stamp, region, service)
|
||||
|
||||
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
|
||||
hashlib.sha256).hexdigest()
|
||||
|
||||
authorization_header = \
|
||||
'{0} Credential={1}/{2}, SignedHeaders={3}, Signature={4}' \
|
||||
.format(algorithm, access_key, credential_scope, signed_headers, signature)
|
||||
|
||||
# The 'host' header is added automatically by the Python 'requests' library.
|
||||
headers = {
|
||||
'Content-Type': content_type,
|
||||
'X-Amz-Content-Sha256': payload_hash,
|
||||
'X-Amz-Date': amz_date,
|
||||
'Authorization': authorization_header
|
||||
}
|
||||
|
||||
endpoint = 'http://' + host + canonical_uri + '?' + canonical_querystring;
|
||||
|
||||
r = requests.post(endpoint, data=request_parameters, headers=headers)
|
||||
print (r.text)
|
|
@ -1,20 +0,0 @@
|
|||
FROM ghcr.io/scality/federation/nodesvc-base:7.10.5.0
|
||||
|
||||
ENV UTAPI_CONFIG_FILE=${CONF_DIR}/config.json
|
||||
|
||||
WORKDIR ${HOME_DIR}/utapi
|
||||
|
||||
COPY ./package.json ./yarn.lock ${HOME_DIR}/utapi
|
||||
|
||||
# Remove when gitcache is sorted out
|
||||
RUN rm /root/.gitconfig
|
||||
|
||||
RUN yarn install --production --frozen-lockfile --network-concurrency 1
|
||||
|
||||
COPY . ${HOME_DIR}/utapi
|
||||
|
||||
RUN chown -R ${USER} ${HOME_DIR}/utapi
|
||||
|
||||
USER ${USER}
|
||||
|
||||
CMD bash -c "source ${CONF_DIR}/env && export && supervisord -c ${CONF_DIR}/${SUPERVISORD_CONF}"
|
|
@ -0,0 +1 @@
|
|||
*.jar filter=lfs diff=lfs merge=lfs -text
|
|
@ -1,2 +0,0 @@
|
|||
standalone.host = 0.0.0.0
|
||||
standalone.port = 4802
|
|
@ -13,7 +13,7 @@ RUN apk add zip unzip build-base \
|
|||
&& cd .. \
|
||||
&& go build -a -o /usr/local/go/warp10_sensision_exporter
|
||||
|
||||
FROM ghcr.io/scality/utapi/warp10:2.8.1-95-g73e7de80
|
||||
FROM registry.scality.com/utapi/warp10:2.8.1-95-g73e7de80
|
||||
|
||||
# Override baked in version
|
||||
# Remove when updating to a numbered release
|
||||
|
@ -27,6 +27,8 @@ ENV SENSISION_DATA_DIR /data/sensision
|
|||
ENV SENSISION_PORT 8082
|
||||
|
||||
# Modify Warp 10 default config
|
||||
ENV standalone.host 0.0.0.0
|
||||
ENV standalone.port 4802
|
||||
ENV standalone.home /opt/warp10
|
||||
ENV warpscript.repository.directory /usr/local/share/warpscript
|
||||
ENV warp.token.file /static.tokens
|
||||
|
@ -51,6 +53,6 @@ COPY --from=builder /usr/local/go/warp10_sensision_exporter /usr/local/bin/warp1
|
|||
ADD ./images/warp10/s6 /etc
|
||||
ADD ./warpscript /usr/local/share/warpscript
|
||||
ADD ./images/warp10/static.tokens /
|
||||
ADD ./images/warp10/90-default-host-port.conf $WARP10_CONF_TEMPLATES/90-default-host-port.conf
|
||||
|
||||
CMD /init
|
||||
|
||||
|
|
|
@ -3,8 +3,9 @@
|
|||
JAVA="/usr/bin/java"
|
||||
JAVA_OPTS=""
|
||||
|
||||
VERSION=1.0.23
|
||||
SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf
|
||||
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${SENSISION_VERSION}.jar
|
||||
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${VERSION}.jar
|
||||
SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR}
|
||||
SENSISION_CLASS=io.warp10.sensision.Main
|
||||
export MALLOC_ARENA_MAX=1
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
version https://git-lfs.github.com/spec/v1
|
||||
oid sha256:389d2135867c399a389901460c5f2cc09f4857d0c6d08632c2638c25fb150c46
|
||||
size 15468553
|
1
index.js
1
index.js
|
@ -1,4 +1,5 @@
|
|||
/* eslint-disable global-require */
|
||||
|
||||
// eslint-disable-line strict
|
||||
let toExport;
|
||||
|
||||
|
|
|
@ -1,13 +1,35 @@
|
|||
/* eslint-disable no-bitwise */
|
||||
const assert = require('assert');
|
||||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
|
||||
/**
|
||||
* Reads from a config file and returns the content as a config object
|
||||
*/
|
||||
class Config {
|
||||
constructor(config) {
|
||||
this.component = config.component;
|
||||
constructor() {
|
||||
/*
|
||||
* By default, the config file is "config.json" at the root.
|
||||
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
|
||||
*/
|
||||
this._basePath = path.resolve(__dirname, '..');
|
||||
this.path = `${this._basePath}/config.json`;
|
||||
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
|
||||
this.path = process.env.UTAPI_CONFIG_FILE;
|
||||
}
|
||||
|
||||
// Read config automatically
|
||||
this._getConfig();
|
||||
}
|
||||
|
||||
_getConfig() {
|
||||
let config;
|
||||
try {
|
||||
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
|
||||
config = JSON.parse(data);
|
||||
} catch (err) {
|
||||
throw new Error(`could not parse config file: ${err.message}`);
|
||||
}
|
||||
|
||||
this.port = 9500;
|
||||
if (config.port !== undefined) {
|
||||
|
@ -93,26 +115,18 @@ class Config {
|
|||
}
|
||||
}
|
||||
|
||||
if (config.vaultclient) {
|
||||
// Instance passed from outside
|
||||
this.vaultclient = config.vaultclient;
|
||||
this.vaultd = null;
|
||||
} else {
|
||||
// Connection data
|
||||
this.vaultclient = null;
|
||||
this.vaultd = {};
|
||||
if (config.vaultd) {
|
||||
if (config.vaultd.port !== undefined) {
|
||||
assert(Number.isInteger(config.vaultd.port)
|
||||
&& config.vaultd.port > 0,
|
||||
'bad config: vaultd port must be a positive integer');
|
||||
this.vaultd.port = config.vaultd.port;
|
||||
}
|
||||
if (config.vaultd.host !== undefined) {
|
||||
assert.strictEqual(typeof config.vaultd.host, 'string',
|
||||
'bad config: vaultd host must be a string');
|
||||
this.vaultd.host = config.vaultd.host;
|
||||
}
|
||||
this.vaultd = {};
|
||||
if (config.vaultd) {
|
||||
if (config.vaultd.port !== undefined) {
|
||||
assert(Number.isInteger(config.vaultd.port)
|
||||
&& config.vaultd.port > 0,
|
||||
'bad config: vaultd port must be a positive integer');
|
||||
this.vaultd.port = config.vaultd.port;
|
||||
}
|
||||
if (config.vaultd.host !== undefined) {
|
||||
assert.strictEqual(typeof config.vaultd.host, 'string',
|
||||
'bad config: vaultd host must be a string');
|
||||
this.vaultd.host = config.vaultd.host;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,11 +141,12 @@ class Config {
|
|||
const { key, cert, ca } = config.certFilePaths
|
||||
? config.certFilePaths : {};
|
||||
if (key && cert) {
|
||||
const keypath = key;
|
||||
const certpath = cert;
|
||||
const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
|
||||
const certpath = (cert[0] === '/')
|
||||
? cert : `${this._basePath}/${cert}`;
|
||||
let capath;
|
||||
if (ca) {
|
||||
capath = ca;
|
||||
capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
|
||||
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK),
|
||||
`File not found or unreachable: ${capath}`);
|
||||
}
|
||||
|
@ -157,13 +172,8 @@ class Config {
|
|||
+ 'expireMetrics must be a boolean');
|
||||
this.expireMetrics = config.expireMetrics;
|
||||
}
|
||||
|
||||
if (config.onlyCountLatestWhenObjectLocked !== undefined) {
|
||||
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
|
||||
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
|
||||
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
|
||||
}
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Config;
|
||||
module.exports = new Config();
|
||||
|
|
|
@ -81,17 +81,6 @@ class Datastore {
|
|||
return this._client.call((backend, done) => backend.incr(key, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* increment value of a key by the provided value
|
||||
* @param {string} key - key holding the value
|
||||
* @param {string} value - value containing the data
|
||||
* @param {callback} cb - callback
|
||||
* @return {undefined}
|
||||
*/
|
||||
incrby(key, value, cb) {
|
||||
return this._client.call((backend, done) => backend.incrby(key, value, done), cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* decrement value of a key by 1
|
||||
* @param {string} key - key holding the value
|
||||
|
|
|
@ -6,6 +6,8 @@ const async = require('async');
|
|||
const { errors } = require('arsenal');
|
||||
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
|
||||
const s3metricResponseJSON = require('../models/s3metricResponse');
|
||||
const config = require('./Config');
|
||||
const Vault = require('./Vault');
|
||||
|
||||
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
|
||||
|
||||
|
@ -21,6 +23,7 @@ class ListMetrics {
|
|||
constructor(metric, component) {
|
||||
this.metric = metric;
|
||||
this.service = component;
|
||||
this.vault = new Vault(config);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -80,10 +83,9 @@ class ListMetrics {
|
|||
const resources = validator.get(this.metric);
|
||||
const timeRange = validator.get('timeRange');
|
||||
const datastore = utapiRequest.getDatastore();
|
||||
const vault = utapiRequest.getVault();
|
||||
// map account ids to canonical ids
|
||||
if (this.metric === 'accounts') {
|
||||
return vault.getCanonicalIds(resources, log, (err, list) => {
|
||||
return this.vault.getCanonicalIds(resources, log, (err, list) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
|
@ -122,11 +124,10 @@ class ListMetrics {
|
|||
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
|
||||
const timeRange = [start - fifteenMinutes, end];
|
||||
const datastore = utapiRequest.getDatastore();
|
||||
const vault = utapiRequest.getVault();
|
||||
|
||||
// map account ids to canonical ids
|
||||
if (this.metric === 'accounts') {
|
||||
return vault.getCanonicalIds(resources, log, (err, list) => {
|
||||
return this.vault.getCanonicalIds(resources, log, (err, list) => {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
|
@ -312,10 +313,11 @@ class ListMetrics {
|
|||
});
|
||||
|
||||
if (!areMetricsPositive) {
|
||||
log.info('negative metric value found', {
|
||||
error: resource,
|
||||
method: 'ListMetrics.getMetrics',
|
||||
});
|
||||
return cb(errors.InternalError.customizeDescription(
|
||||
'Utapi is in a transient state for this time period as '
|
||||
+ 'metrics are being collected. Please try again in a few '
|
||||
+ 'minutes.',
|
||||
));
|
||||
}
|
||||
/**
|
||||
* Batch result is of the format
|
||||
|
|
|
@ -99,7 +99,6 @@ const metricObj = {
|
|||
buckets: 'bucket',
|
||||
accounts: 'accountId',
|
||||
users: 'userId',
|
||||
location: 'location',
|
||||
};
|
||||
|
||||
class UtapiClient {
|
||||
|
@ -123,17 +122,13 @@ class UtapiClient {
|
|||
const api = (config || {}).logApi || werelogs;
|
||||
this.log = new api.Logger('UtapiClient');
|
||||
// By default, we push all resource types
|
||||
this.metrics = ['buckets', 'accounts', 'users', 'service', 'location'];
|
||||
this.metrics = ['buckets', 'accounts', 'users', 'service'];
|
||||
this.service = 's3';
|
||||
this.disableOperationCounters = false;
|
||||
this.enabledOperationCounters = [];
|
||||
this.disableClient = true;
|
||||
|
||||
if (config && !config.disableClient) {
|
||||
this.disableClient = false;
|
||||
this.expireMetrics = config.expireMetrics;
|
||||
this.expireMetricsTTL = config.expireMetricsTTL || 0;
|
||||
|
||||
if (config.metrics) {
|
||||
const message = 'invalid property in UtapiClient configuration';
|
||||
assert(Array.isArray(config.metrics), `${message}: metrics `
|
||||
|
@ -161,6 +156,9 @@ class UtapiClient {
|
|||
if (config.enabledOperationCounters) {
|
||||
this.enabledOperationCounters = config.enabledOperationCounters;
|
||||
}
|
||||
this.disableClient = false;
|
||||
this.expireMetrics = config.expireMetrics;
|
||||
this.expireMetricsTTL = config.expireMetricsTTL || 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -548,9 +546,7 @@ class UtapiClient {
|
|||
if (this._isCounterEnabled(counterAction)) {
|
||||
cmds.push(['incr', generateKey(p, counterAction, timestamp)]);
|
||||
}
|
||||
cmds.push(['zrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp]);
|
||||
});
|
||||
|
||||
return this.ds.batch(cmds, (err, results) => {
|
||||
if (err) {
|
||||
log.error('error pushing metric', {
|
||||
|
@ -584,48 +580,13 @@ class UtapiClient {
|
|||
// empty.
|
||||
actionCounter = Number.isNaN(actionCounter)
|
||||
|| actionCounter < 0 ? 1 : actionCounter;
|
||||
|
||||
if (Number.isInteger(params.byteLength)) {
|
||||
/* byteLength is passed in from cloudserver under the follow conditions:
|
||||
* - bucket versioning is suspended
|
||||
* - object version id is null
|
||||
* - the content length of the object exists
|
||||
* In this case, the master key is deleted and replaced with a delete marker.
|
||||
* The decrement accounts for the deletion of the master key when utapi reports
|
||||
* on the number of objects.
|
||||
*/
|
||||
actionCounter -= 1;
|
||||
}
|
||||
|
||||
const key = generateStateKey(p, 'numberOfObjects');
|
||||
|
||||
const byteArr = results[index + commandsGroupSize - 1][1];
|
||||
const oldByteLength = byteArr ? parseInt(byteArr[0], 10) : 0;
|
||||
const newByteLength = member.serialize(Math.max(0, oldByteLength - params.byteLength));
|
||||
|
||||
cmds2.push(
|
||||
['zremrangebyscore', key, timestamp, timestamp],
|
||||
['zadd', key, timestamp, member.serialize(actionCounter)],
|
||||
|
||||
);
|
||||
|
||||
if (Number.isInteger(params.byteLength)) {
|
||||
cmds2.push(
|
||||
['decr', generateCounter(p, 'numberOfObjectsCounter')],
|
||||
['decrby', generateCounter(p, 'storageUtilizedCounter'), params.byteLength],
|
||||
);
|
||||
}
|
||||
|
||||
if (byteArr) {
|
||||
cmds2.push(
|
||||
['zremrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp],
|
||||
['zadd', generateStateKey(p, 'storageUtilized'), timestamp, newByteLength],
|
||||
);
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
|
||||
if (noErr) {
|
||||
return this.ds.batch(cmds2, cb);
|
||||
}
|
||||
|
@ -1156,69 +1117,6 @@ class UtapiClient {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} location - name of data location
|
||||
* @param {number} updateSize - size in bytes to update location metric by,
|
||||
* could be negative, indicating deleted object
|
||||
* @param {string} reqUid - Request Unique Identifier
|
||||
* @param {function} callback - callback to call
|
||||
* @return {undefined}
|
||||
*/
|
||||
pushLocationMetric(location, updateSize, reqUid, callback) {
|
||||
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
|
||||
const params = {
|
||||
level: 'location',
|
||||
service: 's3',
|
||||
location,
|
||||
};
|
||||
this._checkMetricTypes(params);
|
||||
const action = (updateSize < 0) ? 'decrby' : 'incrby';
|
||||
const size = (updateSize < 0) ? -updateSize : updateSize;
|
||||
return this.ds[action](generateKey(params, 'locationStorage'), size,
|
||||
err => {
|
||||
if (err) {
|
||||
log.error('error pushing metric', {
|
||||
method: 'UtapiClient.pushLocationMetric',
|
||||
error: err,
|
||||
});
|
||||
return callback(errors.InternalError);
|
||||
}
|
||||
return callback();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {string} location - name of data backend to get metric for
|
||||
* @param {string} reqUid - Request Unique Identifier
|
||||
* @param {function} callback - callback to call
|
||||
* @return {undefined}
|
||||
*/
|
||||
getLocationMetric(location, reqUid, callback) {
|
||||
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
|
||||
const params = {
|
||||
level: 'location',
|
||||
service: 's3',
|
||||
location,
|
||||
};
|
||||
const redisKey = generateKey(params, 'locationStorage');
|
||||
return this.ds.get(redisKey, (err, bytesStored) => {
|
||||
if (err) {
|
||||
log.error('error getting metric', {
|
||||
method: 'UtapiClient: getLocationMetric',
|
||||
error: err,
|
||||
});
|
||||
return callback(errors.InternalError);
|
||||
}
|
||||
// if err and bytesStored are null, key does not exist yet
|
||||
if (bytesStored === null) {
|
||||
return callback(null, 0);
|
||||
}
|
||||
return callback(null, bytesStored);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Get storage used by bucket/account/user/service
|
||||
* @param {object} params - params for the metrics
|
||||
|
|
|
@ -16,19 +16,15 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
|
|||
? process.env.REINDEX_PYTHON_INTERPRETER
|
||||
: 'python3.7';
|
||||
|
||||
const EXIT_CODE_SENTINEL_CONNECTION = 100;
|
||||
|
||||
class UtapiReindex {
|
||||
constructor(config) {
|
||||
this._enabled = false;
|
||||
this._schedule = REINDEX_SCHEDULE;
|
||||
this._redis = {
|
||||
this._sentinel = {
|
||||
host: '127.0.0.1',
|
||||
port: 16379,
|
||||
name: 'scality-s3',
|
||||
sentinelPassword: '',
|
||||
sentinels: [{
|
||||
host: '127.0.0.1',
|
||||
port: 16379,
|
||||
}],
|
||||
};
|
||||
this._bucketd = {
|
||||
host: '127.0.0.1',
|
||||
|
@ -46,13 +42,14 @@ class UtapiReindex {
|
|||
if (config && config.password) {
|
||||
this._password = config.password;
|
||||
}
|
||||
if (config && config.redis) {
|
||||
if (config && config.sentinel) {
|
||||
const {
|
||||
name, sentinelPassword, sentinels,
|
||||
} = config.redis;
|
||||
this._redis.name = name || this._redis.name;
|
||||
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
|
||||
this._redis.sentinels = sentinels || this._redis.sentinels;
|
||||
host, port, name, sentinelPassword,
|
||||
} = config.sentinel;
|
||||
this._sentinel.host = host || this._sentinel.host;
|
||||
this._sentinel.port = port || this._sentinel.port;
|
||||
this._sentinel.name = name || this._sentinel.name;
|
||||
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
|
||||
}
|
||||
if (config && config.bucketd) {
|
||||
const { host, port } = config.bucketd;
|
||||
|
@ -64,16 +61,17 @@ class UtapiReindex {
|
|||
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
|
||||
}
|
||||
|
||||
this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);
|
||||
|
||||
this._requestLogger = this._log.newRequestLogger();
|
||||
}
|
||||
|
||||
_getRedisClient() {
|
||||
const client = new RedisClient({
|
||||
sentinels: this._redis.sentinels,
|
||||
name: this._redis.name,
|
||||
sentinelPassword: this._redis.sentinelPassword,
|
||||
sentinels: [{
|
||||
host: this._sentinel.host,
|
||||
port: this._sentinel.port,
|
||||
}],
|
||||
name: this._sentinel.name,
|
||||
sentinelPassword: this._sentinel.sentinelPassword,
|
||||
password: this._password,
|
||||
});
|
||||
client.connect();
|
||||
|
@ -88,18 +86,17 @@ class UtapiReindex {
|
|||
return this.ds.del(REINDEX_LOCK_KEY);
|
||||
}
|
||||
|
||||
_buildFlags(sentinel) {
|
||||
_buildFlags() {
|
||||
const flags = {
|
||||
/* eslint-disable camelcase */
|
||||
sentinel_ip: sentinel.host,
|
||||
sentinel_port: sentinel.port,
|
||||
sentinel_cluster_name: this._redis.name,
|
||||
sentinel_ip: this._sentinel.host,
|
||||
sentinel_port: this._sentinel.port,
|
||||
sentinel_cluster_name: this._sentinel.name,
|
||||
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
|
||||
};
|
||||
if (this._redis.sentinelPassword) {
|
||||
flags.redis_password = this._redis.sentinelPassword;
|
||||
if (this._sentinel.sentinelPassword) {
|
||||
flags.redis_password = this._sentinel.sentinelPassword;
|
||||
}
|
||||
|
||||
/* eslint-enable camelcase */
|
||||
const opts = [];
|
||||
Object.keys(flags)
|
||||
|
@ -108,15 +105,11 @@ class UtapiReindex {
|
|||
opts.push(name);
|
||||
opts.push(flags[flag]);
|
||||
});
|
||||
|
||||
if (this._onlyCountLatestWhenObjectLocked) {
|
||||
opts.push('--only-latest-when-locked');
|
||||
}
|
||||
return opts;
|
||||
}
|
||||
|
||||
_runScriptWithSentinels(path, remainingSentinels, done) {
|
||||
const flags = this._buildFlags(remainingSentinels.shift());
|
||||
_runScript(path, done) {
|
||||
const flags = this._buildFlags();
|
||||
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
|
||||
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
|
||||
process.stdout.on('data', data => {
|
||||
|
@ -143,17 +136,6 @@ class UtapiReindex {
|
|||
statusCode: code,
|
||||
script: path,
|
||||
});
|
||||
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
|
||||
if (remainingSentinels.length > 0) {
|
||||
this._requestLogger.info('retrying with next sentinel host', {
|
||||
script: path,
|
||||
});
|
||||
return this._runScriptWithSentinels(path, remainingSentinels, done);
|
||||
}
|
||||
this._requestLogger.error('no more sentinel host to try', {
|
||||
script: path,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
this._requestLogger.info('script exited successfully', {
|
||||
statusCode: code,
|
||||
|
@ -164,11 +146,6 @@ class UtapiReindex {
|
|||
});
|
||||
}
|
||||
|
||||
_runScript(path, done) {
|
||||
const remainingSentinels = [...this._redis.sentinels];
|
||||
this._runScriptWithSentinels(path, remainingSentinels, done);
|
||||
}
|
||||
|
||||
_attemptLock(job) {
|
||||
this._requestLogger.info('attempting to acquire the lock to begin job');
|
||||
this._lock()
|
||||
|
|
|
@ -14,15 +14,6 @@ class UtapiRequest {
|
|||
this._datastore = null;
|
||||
this._requestQuery = null;
|
||||
this._requestPath = null;
|
||||
this._vault = null;
|
||||
}
|
||||
|
||||
getVault() {
|
||||
return this._vault;
|
||||
}
|
||||
|
||||
setVault() {
|
||||
return this._vault;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,21 +1,16 @@
|
|||
import argparse
|
||||
import ast
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import redis
|
||||
import requests
|
||||
import redis
|
||||
import json
|
||||
import ast
|
||||
import sys
|
||||
from threading import Thread
|
||||
import time
|
||||
import urllib
|
||||
import re
|
||||
import sys
|
||||
from threading import Thread
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
_log = logging.getLogger('utapi-reindex:reporting')
|
||||
|
||||
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
|
||||
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
|
||||
import argparse
|
||||
|
||||
def get_options():
|
||||
parser = argparse.ArgumentParser()
|
||||
|
@ -34,19 +29,8 @@ class askRedis():
|
|||
|
||||
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
|
||||
self._password = password
|
||||
r = redis.Redis(
|
||||
host=ip,
|
||||
port=port,
|
||||
db=0,
|
||||
password=password,
|
||||
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
|
||||
)
|
||||
try:
|
||||
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
|
||||
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
|
||||
# use a specific error code to hint on retrying with another sentinel node
|
||||
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
|
||||
r = redis.Redis(host=ip, port=port, db=0, password=password)
|
||||
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
|
||||
|
||||
def read(self, resource, name):
|
||||
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
|
||||
|
@ -114,4 +98,4 @@ if __name__ == '__main__':
|
|||
data = U.read('accounts', userid)
|
||||
content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
|
||||
userid, data["files"], data["total_size"])
|
||||
executor.submit(safe_print, content)
|
||||
executor.submit(safe_print, content)
|
|
@ -1,6 +1,5 @@
|
|||
import argparse
|
||||
import concurrent.futures as futures
|
||||
import functools
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
|
@ -9,9 +8,9 @@ import re
|
|||
import sys
|
||||
import time
|
||||
import urllib
|
||||
from pathlib import Path
|
||||
from collections import defaultdict, namedtuple
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from pprint import pprint
|
||||
|
||||
import redis
|
||||
import requests
|
||||
|
@ -25,9 +24,6 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
|
|||
|
||||
ACCOUNT_UPDATE_CHUNKSIZE = 100
|
||||
|
||||
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
|
||||
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
|
||||
|
||||
def get_options():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
|
||||
|
@ -36,38 +32,9 @@ def get_options():
|
|||
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
|
||||
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
|
||||
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
|
||||
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
|
||||
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
|
||||
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
|
||||
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
|
||||
group = parser.add_mutually_exclusive_group()
|
||||
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
|
||||
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
|
||||
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
|
||||
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
|
||||
|
||||
options = parser.parse_args()
|
||||
if options.bucket_file:
|
||||
with open(options.bucket_file) as f:
|
||||
options.bucket = [line.strip() for line in f if line.strip()]
|
||||
elif options.account_file:
|
||||
with open(options.account_file) as f:
|
||||
options.account = [line.strip() for line in f if line.strip()]
|
||||
|
||||
return options
|
||||
|
||||
def nonempty_string(flag):
|
||||
def inner(value):
|
||||
if not value.strip():
|
||||
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
|
||||
return value
|
||||
return inner
|
||||
|
||||
def existing_file(path):
|
||||
path = Path(path).resolve()
|
||||
if not path.exists():
|
||||
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
|
||||
return path
|
||||
return parser.parse_args()
|
||||
|
||||
def chunks(iterable, size):
|
||||
it = iter(iterable)
|
||||
|
@ -82,7 +49,7 @@ def _encoded(func):
|
|||
return urllib.parse.quote(val.encode('utf-8'))
|
||||
return inner
|
||||
|
||||
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'])
|
||||
Bucket = namedtuple('Bucket', ['userid', 'name'])
|
||||
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
|
||||
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])
|
||||
|
||||
|
@ -94,21 +61,15 @@ class InvalidListing(Exception):
|
|||
def __init__(self, bucket):
|
||||
super().__init__('Invalid contents found while listing bucket %s'%bucket)
|
||||
|
||||
class BucketNotFound(Exception):
|
||||
def __init__(self, bucket):
|
||||
super().__init__('Bucket %s not found'%bucket)
|
||||
|
||||
class BucketDClient:
|
||||
|
||||
'''Performs Listing calls against bucketd'''
|
||||
__url_attribute_format = '{addr}/default/attributes/{bucket}'
|
||||
__url_bucket_format = '{addr}/default/bucket/{bucket}'
|
||||
__url_format = '{addr}/default/bucket/{bucket}'
|
||||
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}
|
||||
|
||||
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
|
||||
def __init__(self, bucketd_addr=None, max_retries=2):
|
||||
self._bucketd_addr = bucketd_addr
|
||||
self._max_retries = max_retries
|
||||
self._only_latest_when_locked = only_latest_when_locked
|
||||
self._session = requests.Session()
|
||||
|
||||
def _do_req(self, url, check_500=True, **kwargs):
|
||||
|
@ -140,7 +101,7 @@ class BucketDClient:
|
|||
parameters value. On the first request the function will be called with
|
||||
`None` and should return its initial value. Return `None` for the param to be excluded.
|
||||
'''
|
||||
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
|
||||
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
|
||||
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
|
||||
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
|
||||
is_truncated = True # Set to True for first loop
|
||||
|
@ -153,9 +114,6 @@ class BucketDClient:
|
|||
_log.debug('listing bucket bucket: %s params: %s'%(
|
||||
bucket, ', '.join('%s=%s'%p for p in params.items())))
|
||||
resp = self._do_req(url, params=params)
|
||||
if resp.status_code == 404:
|
||||
_log.debug('Bucket not found bucket: %s'%bucket)
|
||||
return
|
||||
if resp.status_code == 200:
|
||||
payload = resp.json()
|
||||
except ValueError as e:
|
||||
|
@ -177,37 +135,7 @@ class BucketDClient:
|
|||
else:
|
||||
is_truncated = len(payload) > 0
|
||||
|
||||
@functools.lru_cache(maxsize=16)
|
||||
def _get_bucket_attributes(self, name):
|
||||
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
|
||||
try:
|
||||
resp = self._do_req(url)
|
||||
if resp.status_code == 200:
|
||||
return resp.json()
|
||||
else:
|
||||
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
|
||||
raise BucketNotFound(name)
|
||||
except ValueError as e:
|
||||
_log.exception(e)
|
||||
_log.error('Invalid attributes response body! bucket:%s'%name)
|
||||
raise
|
||||
except MaxRetriesReached:
|
||||
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
|
||||
raise
|
||||
except Exception as e:
|
||||
_log.exception(e)
|
||||
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
|
||||
raise
|
||||
|
||||
def get_bucket_md(self, name):
|
||||
md = self._get_bucket_attributes(name)
|
||||
canonId = md.get('owner')
|
||||
if canonId is None:
|
||||
_log.error('No owner found for bucket %s'%name)
|
||||
raise InvalidListing(name)
|
||||
return Bucket(canonId, name, md.get('objectLockEnabled', False))
|
||||
|
||||
def list_buckets(self, account=None):
|
||||
def list_buckets(self, name = None):
|
||||
|
||||
def get_next_marker(p):
|
||||
if p is None:
|
||||
|
@ -219,24 +147,19 @@ class BucketDClient:
|
|||
'maxKeys': 1000,
|
||||
'marker': get_next_marker
|
||||
}
|
||||
|
||||
if account is not None:
|
||||
params['prefix'] = '%s..|..' % account
|
||||
|
||||
for _, payload in self._list_bucket(USERS_BUCKET, **params):
|
||||
buckets = []
|
||||
for result in payload.get('Contents', []):
|
||||
for result in payload['Contents']:
|
||||
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
|
||||
bucket = Bucket(*match.groups(), False)
|
||||
# We need to get the attributes for each bucket to determine if it is locked
|
||||
if self._only_latest_when_locked:
|
||||
bucket_attrs = self._get_bucket_attributes(bucket.name)
|
||||
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
|
||||
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
|
||||
buckets.append(bucket)
|
||||
|
||||
bucket = Bucket(*match.groups())
|
||||
if name is None or bucket.name == name:
|
||||
buckets.append(bucket)
|
||||
if buckets:
|
||||
yield buckets
|
||||
if name is not None:
|
||||
# Break on the first matching bucket if a name is given
|
||||
break
|
||||
|
||||
|
||||
def list_mpus(self, bucket):
|
||||
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
|
||||
|
@ -273,12 +196,18 @@ class BucketDClient:
|
|||
upload_id=key['value']['UploadId']))
|
||||
return keys
|
||||
|
||||
def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
|
||||
def _sum_objects(self, bucket, listing):
|
||||
count = 0
|
||||
total_size = 0
|
||||
last_key = None
|
||||
try:
|
||||
for obj in listing:
|
||||
last_master = None
|
||||
last_size = None
|
||||
for status_code, payload in listing:
|
||||
contents = payload['Contents'] if isinstance(payload, dict) else payload
|
||||
if contents is None:
|
||||
_log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket, status_code))
|
||||
raise InvalidListing(bucket)
|
||||
for obj in contents:
|
||||
count += 1
|
||||
if isinstance(obj['value'], dict):
|
||||
# bucketd v6 returns a dict:
|
||||
data = obj.get('value', {})
|
||||
|
@ -287,51 +216,39 @@ class BucketDClient:
|
|||
# bucketd v7 returns an encoded string
|
||||
data = json.loads(obj['value'])
|
||||
size = data.get('content-length', 0)
|
||||
|
||||
is_latest = obj['key'] != last_key
|
||||
last_key = obj['key']
|
||||
|
||||
if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
|
||||
_log.debug('Skipping versioned key: %s'%obj['key'])
|
||||
continue
|
||||
|
||||
count += 1
|
||||
total_size += size
|
||||
|
||||
except InvalidListing:
|
||||
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
|
||||
raise InvalidListing(bucket.name)
|
||||
return count, total_size
|
||||
# If versioned, subtract the size of the master to avoid double counting
|
||||
if last_master is not None and obj['key'].startswith(last_master + '\x00'):
|
||||
_log.debug('Detected versioned key: %s - subtracting master size: %i'% (
|
||||
obj['key'],
|
||||
last_size,
|
||||
))
|
||||
total_size -= last_size
|
||||
count -= 1
|
||||
last_master = None
|
||||
|
||||
def _extract_listing(self, key, listing):
|
||||
for status_code, payload in listing:
|
||||
contents = payload[key] if isinstance(payload, dict) else payload
|
||||
if contents is None:
|
||||
raise InvalidListing('')
|
||||
for obj in contents:
|
||||
yield obj
|
||||
# Only save master versions
|
||||
elif '\x00' not in obj['key']:
|
||||
last_master = obj['key']
|
||||
last_size = size
|
||||
|
||||
return count, total_size
|
||||
|
||||
def count_bucket_contents(self, bucket):
|
||||
|
||||
def get_key_marker(p):
|
||||
if p is None:
|
||||
def get_next_marker(p):
|
||||
if p is None or len(p) == 0:
|
||||
return ''
|
||||
return p.get('NextKeyMarker', '')
|
||||
|
||||
def get_vid_marker(p):
|
||||
if p is None:
|
||||
return ''
|
||||
return p.get('NextVersionIdMarker', '')
|
||||
return p[-1].get('key', '')
|
||||
|
||||
params = {
|
||||
'listingType': 'DelimiterVersions',
|
||||
'listingType': 'Basic',
|
||||
'maxKeys': 1000,
|
||||
'keyMarker': get_key_marker,
|
||||
'versionIdMarker': get_vid_marker,
|
||||
'gt': get_next_marker,
|
||||
}
|
||||
|
||||
listing = self._list_bucket(bucket.name, **params)
|
||||
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
|
||||
count, total_size = self._sum_objects(bucket.name, self._list_bucket(bucket.name, **params))
|
||||
return BucketContents(
|
||||
bucket=bucket,
|
||||
obj_count=count,
|
||||
|
@ -339,8 +256,7 @@ class BucketDClient:
|
|||
)
|
||||
|
||||
def count_mpu_parts(self, mpu):
|
||||
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
|
||||
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)
|
||||
_bucket = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
|
||||
|
||||
def get_prefix(p):
|
||||
if p is None:
|
||||
|
@ -360,31 +276,13 @@ class BucketDClient:
|
|||
'listingType': 'Delimiter',
|
||||
}
|
||||
|
||||
listing = self._list_bucket(shadow_bucket_name, **params)
|
||||
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
|
||||
count, total_size = self._sum_objects(_bucket, self._list_bucket(_bucket, **params))
|
||||
return BucketContents(
|
||||
bucket=shadow_bucket,
|
||||
bucket=mpu.bucket._replace(name=_bucket),
|
||||
obj_count=0, # MPU parts are not counted towards numberOfObjects
|
||||
total_size=total_size
|
||||
)
|
||||
|
||||
def list_all_buckets(bucket_client):
|
||||
return bucket_client.list_buckets()
|
||||
|
||||
def list_specific_accounts(bucket_client, accounts):
|
||||
for account in accounts:
|
||||
yield from bucket_client.list_buckets(account=account)
|
||||
|
||||
def list_specific_buckets(bucket_client, buckets):
|
||||
batch = []
|
||||
for bucket in buckets:
|
||||
try:
|
||||
batch.append(bucket_client.get_bucket_md(bucket))
|
||||
except BucketNotFound:
|
||||
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
|
||||
continue
|
||||
|
||||
yield batch
|
||||
|
||||
def index_bucket(client, bucket):
|
||||
'''
|
||||
|
@ -424,16 +322,9 @@ def get_redis_client(options):
|
|||
host=options.sentinel_ip,
|
||||
port=options.sentinel_port,
|
||||
db=0,
|
||||
password=options.redis_password,
|
||||
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
|
||||
password=options.redis_password
|
||||
)
|
||||
try:
|
||||
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
|
||||
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
|
||||
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
|
||||
# use a specific error code to hint on retrying with another sentinel node
|
||||
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
|
||||
|
||||
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
|
||||
return redis.Redis(
|
||||
host=ip,
|
||||
port=port,
|
||||
|
@ -467,24 +358,16 @@ def log_report(resource, name, obj_count, total_size):
|
|||
|
||||
if __name__ == '__main__':
|
||||
options = get_options()
|
||||
if options.debug:
|
||||
_log.setLevel(logging.DEBUG)
|
||||
|
||||
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
|
||||
if options.bucket is not None and not options.bucket.strip():
|
||||
print('You must provide a bucket name with the --bucket flag')
|
||||
sys.exit(1)
|
||||
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries)
|
||||
redis_client = get_redis_client(options)
|
||||
account_reports = {}
|
||||
observed_buckets = set()
|
||||
failed_accounts = set()
|
||||
|
||||
if options.account:
|
||||
batch_generator = list_specific_accounts(bucket_client, options.account)
|
||||
elif options.bucket:
|
||||
batch_generator = list_specific_buckets(bucket_client, options.bucket)
|
||||
else:
|
||||
batch_generator = list_all_buckets(bucket_client)
|
||||
|
||||
with ThreadPoolExecutor(max_workers=options.worker) as executor:
|
||||
for batch in batch_generator:
|
||||
for batch in bucket_client.list_buckets(options.bucket):
|
||||
bucket_reports = {}
|
||||
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
|
||||
for job in futures.as_completed(jobs.keys()):
|
||||
|
@ -503,84 +386,51 @@ if __name__ == '__main__':
|
|||
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
|
||||
|
||||
# Bucket reports can be updated as we get them
|
||||
if options.dry_run:
|
||||
for bucket, report in bucket_reports.items():
|
||||
_log.info(
|
||||
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
|
||||
bucket, report['obj_count'], report['total_size']
|
||||
)
|
||||
)
|
||||
else:
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for bucket, report in bucket_reports.items():
|
||||
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
|
||||
log_report('buckets', bucket, report['obj_count'], report['total_size'])
|
||||
pipeline.execute()
|
||||
|
||||
stale_buckets = set()
|
||||
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
|
||||
if options.bucket:
|
||||
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
|
||||
elif options.account:
|
||||
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
|
||||
else:
|
||||
stale_buckets = recorded_buckets.difference(observed_buckets)
|
||||
|
||||
_log.info('Found %s stale buckets' % len(stale_buckets))
|
||||
if options.dry_run:
|
||||
_log.info("DryRun: not updating stale buckets")
|
||||
else:
|
||||
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for bucket in chunk:
|
||||
update_redis(pipeline, 'buckets', bucket, 0, 0)
|
||||
log_report('buckets', bucket, 0, 0)
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for bucket, report in bucket_reports.items():
|
||||
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
|
||||
log_report('buckets', bucket, report['obj_count'], report['total_size'])
|
||||
pipeline.execute()
|
||||
|
||||
# Account metrics are not updated if a bucket is specified
|
||||
if options.bucket:
|
||||
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
|
||||
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
|
||||
if options.bucket is None:
|
||||
stale_buckets = recorded_buckets.difference(observed_buckets)
|
||||
elif observed_buckets and options.bucket not in recorded_buckets:
|
||||
# The provided bucket does not exist, so clean up any metrics
|
||||
stale_buckets = { options.bucket }
|
||||
else:
|
||||
stale_buckets = set()
|
||||
|
||||
_log.info('Found %s stale buckets' % len(stale_buckets))
|
||||
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for bucket in chunk:
|
||||
update_redis(pipeline, 'buckets', bucket, 0, 0)
|
||||
log_report('buckets', bucket, 0, 0)
|
||||
pipeline.execute()
|
||||
|
||||
# Account metrics are not updated if a bucket is specified
|
||||
if options.bucket is None:
|
||||
# Don't update any accounts with failed listings
|
||||
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
|
||||
if options.dry_run:
|
||||
for userid, report in account_reports.items():
|
||||
_log.info(
|
||||
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
|
||||
userid, report['obj_count'], report['total_size']
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Update total account reports in chunks
|
||||
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for userid, report in chunk:
|
||||
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
|
||||
log_report('accounts', userid, report['obj_count'], report['total_size'])
|
||||
pipeline.execute()
|
||||
|
||||
if options.account:
|
||||
for account in options.account:
|
||||
if account in failed_accounts:
|
||||
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
|
||||
# Update total account reports in chunks
|
||||
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for userid, report in chunk:
|
||||
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
|
||||
log_report('accounts', userid, report['obj_count'], report['total_size'])
|
||||
pipeline.execute()
|
||||
|
||||
# Include failed_accounts in observed_accounts to avoid clearing metrics
|
||||
observed_accounts = failed_accounts.union(set(account_reports.keys()))
|
||||
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
|
||||
|
||||
if options.account:
|
||||
stale_accounts = { a for a in options.account if a not in observed_accounts }
|
||||
else:
|
||||
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
|
||||
stale_accounts = recorded_accounts.difference(observed_accounts)
|
||||
|
||||
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
|
||||
stale_accounts = recorded_accounts.difference(observed_accounts)
|
||||
_log.info('Found %s stale accounts' % len(stale_accounts))
|
||||
if options.dry_run:
|
||||
_log.info("DryRun: not updating stale accounts")
|
||||
else:
|
||||
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for account in chunk:
|
||||
update_redis(pipeline, 'accounts', account, 0, 0)
|
||||
log_report('accounts', account, 0, 0)
|
||||
pipeline.execute()
|
||||
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
|
||||
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
|
||||
for account in chunk:
|
||||
update_redis(pipeline, 'accounts', account, 0, 0)
|
||||
log_report('accounts', account, 0, 0)
|
||||
pipeline.execute()
|
||||
|
|
|
@ -68,10 +68,10 @@ const keys = {
|
|||
*/
|
||||
function getSchemaPrefix(params, timestamp) {
|
||||
const {
|
||||
bucket, accountId, userId, level, service, location,
|
||||
bucket, accountId, userId, level, service,
|
||||
} = params;
|
||||
// `service` property must remain last because other objects also include it
|
||||
const id = bucket || accountId || userId || location || service;
|
||||
const id = bucket || accountId || userId || service;
|
||||
const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:`
|
||||
: `${service}:${level}:${id}:`;
|
||||
return prefix;
|
||||
|
@ -86,13 +86,9 @@ function getSchemaPrefix(params, timestamp) {
|
|||
*/
|
||||
function generateKey(params, metric, timestamp) {
|
||||
const prefix = getSchemaPrefix(params, timestamp);
|
||||
if (params.location) {
|
||||
return `${prefix}locationStorage`;
|
||||
}
|
||||
return keys[metric](prefix);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Returns a list of the counters for a metric type
|
||||
* @param {object} params - object with metric type and id as a property
|
||||
|
|
|
@ -7,6 +7,7 @@ const { Clustering, errors, ipCheck } = require('arsenal');
|
|||
const arsenalHttps = require('arsenal').https;
|
||||
const { Logger } = require('werelogs');
|
||||
|
||||
const config = require('./Config');
|
||||
const routes = require('../router/routes');
|
||||
const Route = require('../router/Route');
|
||||
const Router = require('../router/Router');
|
||||
|
@ -27,12 +28,7 @@ class UtapiServer {
|
|||
constructor(worker, port, datastore, logger, config) {
|
||||
this.worker = worker;
|
||||
this.port = port;
|
||||
this.vault = config.vaultclient;
|
||||
if (!this.vault) {
|
||||
const Vault = require('./Vault');
|
||||
this.vault = new Vault(config);
|
||||
}
|
||||
this.router = new Router(config, this.vault);
|
||||
this.router = new Router(config);
|
||||
this.logger = logger;
|
||||
this.datastore = datastore;
|
||||
this.server = null;
|
||||
|
@ -75,7 +71,6 @@ class UtapiServer {
|
|||
req.socket.setNoDelay();
|
||||
const { query, path, pathname } = url.parse(req.url, true);
|
||||
const utapiRequest = new UtapiRequest()
|
||||
.setVault(this.vault)
|
||||
.setRequest(req)
|
||||
.setLog(this.logger.newRequestLogger())
|
||||
.setResponse(res)
|
||||
|
@ -219,7 +214,8 @@ class UtapiServer {
|
|||
* @property {object} params.log - logger configuration
|
||||
* @return {undefined}
|
||||
*/
|
||||
function spawn(config) {
|
||||
function spawn(params) {
|
||||
Object.assign(config, params);
|
||||
const {
|
||||
workers, redis, log, port,
|
||||
} = config;
|
||||
|
|
|
@ -23,6 +23,10 @@
|
|||
"healthChecks": {
|
||||
"allowFrom": ["127.0.0.1/8", "::1"]
|
||||
},
|
||||
"vaultd": {
|
||||
"host": "127.0.0.1",
|
||||
"port": 8500
|
||||
},
|
||||
"cacheBackend": "memory",
|
||||
"development": false,
|
||||
"nodeId": "single_node",
|
||||
|
|
|
@ -2,8 +2,6 @@ const fs = require('fs');
|
|||
const path = require('path');
|
||||
const Joi = require('@hapi/joi');
|
||||
const assert = require('assert');
|
||||
const defaults = require('./defaults.json');
|
||||
const werelogs = require('werelogs');
|
||||
|
||||
const {
|
||||
truthy, envNamespace, allowedFilterFields, allowedFilterStates,
|
||||
|
@ -73,6 +71,7 @@ class Config {
|
|||
constructor(overrides) {
|
||||
this._basePath = path.join(__dirname, '../../');
|
||||
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
|
||||
this._defaultsPath = path.join(__dirname, 'defaults.json');
|
||||
|
||||
this.host = undefined;
|
||||
this.port = undefined;
|
||||
|
@ -90,11 +89,6 @@ class Config {
|
|||
parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
|
||||
}
|
||||
Object.assign(this, parsedConfig);
|
||||
|
||||
werelogs.configure({
|
||||
level: Config.logging.level,
|
||||
dump: Config.logging.dumpLevel,
|
||||
});
|
||||
}
|
||||
|
||||
static _readFile(path, encoding = 'utf-8') {
|
||||
|
@ -119,7 +113,7 @@ class Config {
|
|||
}
|
||||
|
||||
_loadDefaults() {
|
||||
return defaults;
|
||||
return Config._readJSON(this._defaultsPath);
|
||||
}
|
||||
|
||||
_loadUserConfig() {
|
||||
|
@ -198,10 +192,6 @@ class Config {
|
|||
`${prefix}_SENTINEL_PASSWORD`,
|
||||
config.sentinelPassword,
|
||||
);
|
||||
redisConf.password = _loadFromEnv(
|
||||
`${prefix}_PASSWORD`,
|
||||
config.password,
|
||||
);
|
||||
} else {
|
||||
redisConf.host = _loadFromEnv(
|
||||
`${prefix}_HOST`,
|
||||
|
|
|
@ -22,7 +22,6 @@ const constants = {
|
|||
'deleteBucketEncryption',
|
||||
'deleteBucketLifecycle',
|
||||
'deleteBucketReplication',
|
||||
'deleteBucketTagging',
|
||||
'deleteBucketWebsite',
|
||||
'deleteObject',
|
||||
'deleteObjectTagging',
|
||||
|
@ -35,7 +34,6 @@ const constants = {
|
|||
'getBucketObjectLock',
|
||||
'getBucketReplication',
|
||||
'getBucketVersioning',
|
||||
'getBucketTagging',
|
||||
'getBucketWebsite',
|
||||
'getObject',
|
||||
'getObjectAcl',
|
||||
|
@ -57,7 +55,6 @@ const constants = {
|
|||
'putBucketObjectLock',
|
||||
'putBucketReplication',
|
||||
'putBucketVersioning',
|
||||
'putBucketTagging',
|
||||
'putBucketWebsite',
|
||||
'putDeleteMarkerObject',
|
||||
'putObject',
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
const arsenal = require('arsenal');
|
||||
const async = require('async');
|
||||
|
||||
const metadata = require('./client');
|
||||
const { LoggerContext, logger } = require('../utils');
|
||||
const { keyVersionSplitter } = require('../constants');
|
||||
|
@ -12,14 +12,9 @@ const moduleLogger = new LoggerContext({
|
|||
module: 'metadata.client',
|
||||
});
|
||||
|
||||
const ebConfig = {
|
||||
times: 10,
|
||||
interval: retryCount => 50 * (2 ** retryCount),
|
||||
};
|
||||
|
||||
const PAGE_SIZE = 1000;
|
||||
|
||||
async function _listingWrapper(bucket, params) {
|
||||
function _listingWrapper(bucket, params) {
|
||||
return new Promise(
|
||||
(resolve, reject) => metadata.listObject(
|
||||
bucket,
|
||||
|
@ -46,7 +41,7 @@ function _listObject(bucket, prefix, hydrateFunc) {
|
|||
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
res = await async.retryable(ebConfig, _listingWrapper)(bucket, { ...listingParams, gt });
|
||||
res = await _listingWrapper(bucket, { ...listingParams, gt });
|
||||
} catch (error) {
|
||||
moduleLogger.error('Error during listing', { error });
|
||||
throw error;
|
||||
|
|
|
@ -6,8 +6,7 @@ const BackOff = require('backo');
|
|||
const { whilst } = require('async');
|
||||
|
||||
const errors = require('./errors');
|
||||
const { LoggerContext } = require('./utils/log');
|
||||
const { asyncOrCallback } = require('./utils/func');
|
||||
const { LoggerContext, asyncOrCallback } = require('./utils');
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
module: 'redis',
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
const promClient = require('prom-client');
|
||||
|
||||
const httpRequestsTotal = new promClient.Counter({
|
||||
name: 's3_utapi_http_requests_total',
|
||||
name: 'utapi_http_requests_total',
|
||||
help: 'Total number of HTTP requests',
|
||||
labelNames: ['action', 'code'],
|
||||
});
|
||||
|
||||
const httpRequestDurationSeconds = new promClient.Histogram({
|
||||
name: 's3_utapi_http_request_duration_seconds',
|
||||
name: 'utapi_http_request_duration_seconds',
|
||||
help: 'Duration of HTTP requests in seconds',
|
||||
labelNames: ['action', 'code'],
|
||||
// buckets for response time from 0.1ms to 60s
|
||||
|
|
|
@ -68,20 +68,20 @@ class BaseTask extends Process {
|
|||
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
|
||||
|
||||
const executionDuration = new promClient.Gauge({
|
||||
name: `s3_utapi${taskNameSnake}_duration_seconds`,
|
||||
name: `utapi${taskNameSnake}_duration_seconds`,
|
||||
help: `Execution time of the ${taskName} task`,
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const executionAttempts = new promClient.Counter({
|
||||
name: `s3_utapi${taskNameSnake}_attempts_total`,
|
||||
help: `Total number of attempts to execute the ${taskName} task`,
|
||||
name: `utapi${taskNameSnake}_attempts_total`,
|
||||
help: `Number of attempts to execute the ${taskName} task`,
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const executionFailures = new promClient.Counter({
|
||||
name: `s3_utapi${taskNameSnake}_failures_total`,
|
||||
help: `Total number of failures executing the ${taskName} task`,
|
||||
name: `utapi${taskNameSnake}_failures_total`,
|
||||
help: `Number of failures executing the ${taskName} task`,
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
|
|
|
@ -24,14 +24,14 @@ class CreateCheckpoint extends BaseTask {
|
|||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const created = new promClient.Counter({
|
||||
name: 's3_utapi_create_checkpoint_created_total',
|
||||
help: 'Total number of checkpoints created',
|
||||
name: 'utapi_create_checkpoint_created_total',
|
||||
help: 'Number of checkpoints created',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const getLastCheckpoint = this._getLastCheckpoint.bind(this);
|
||||
const lastCheckpoint = new promClient.Gauge({
|
||||
name: 's3_utapi_create_checkpoint_last_checkpoint_seconds',
|
||||
name: 'utapi_create_checkpoint_last_checkpoint_seconds',
|
||||
help: 'Timestamp of the last successfully created checkpoint',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
async collect() {
|
||||
|
|
|
@ -24,14 +24,14 @@ class CreateSnapshot extends BaseTask {
|
|||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const created = new promClient.Counter({
|
||||
name: 's3_utapi_create_snapshot_created_total',
|
||||
help: 'Total number of snapshots created',
|
||||
name: 'utapi_create_snapshot_created_total',
|
||||
help: 'Number of snapshots created',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const getLastSnapshot = this._getLastSnapshot.bind(this);
|
||||
const lastSnapshot = new promClient.Gauge({
|
||||
name: 's3_utapi_create_snapshot_last_snapshot_seconds',
|
||||
name: 'utapi_create_snapshot_last_snapshot_seconds',
|
||||
help: 'Timestamp of the last successfully created snapshot',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
async collect() {
|
||||
|
|
|
@ -52,31 +52,31 @@ class MonitorDiskUsage extends BaseTask {
|
|||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const isLocked = new promClient.Gauge({
|
||||
name: 's3_utapi_monitor_disk_usage_is_locked',
|
||||
name: 'utapi_monitor_disk_usage_is_locked',
|
||||
help: 'Indicates whether the monitored warp 10 has had writes disabled',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const leveldbBytes = new promClient.Gauge({
|
||||
name: 's3_utapi_monitor_disk_usage_leveldb_bytes',
|
||||
name: 'utapi_monitor_disk_usage_leveldb_bytes',
|
||||
help: 'Total bytes used by warp 10 leveldb',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const datalogBytes = new promClient.Gauge({
|
||||
name: 's3_utapi_monitor_disk_usage_datalog_bytes',
|
||||
name: 'utapi_monitor_disk_usage_datalog_bytes',
|
||||
help: 'Total bytes used by warp 10 datalog',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const hardLimitRatio = new promClient.Gauge({
|
||||
name: 's3_utapi_monitor_disk_usage_hard_limit_ratio',
|
||||
name: 'utapi_monitor_disk_usage_hard_limit_ratio',
|
||||
help: 'Percent of the hard limit used by warp 10',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const hardLimitSetting = new promClient.Gauge({
|
||||
name: 's3_utapi_monitor_disk_usage_hard_limit_bytes',
|
||||
name: 'utapi_monitor_disk_usage_hard_limit_bytes',
|
||||
help: 'The hard limit setting in bytes',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
|
|
@ -32,26 +32,26 @@ class IngestShardTask extends BaseTask {
|
|||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const ingestedTotal = new promClient.Counter({
|
||||
name: 's3_utapi_ingest_shard_task_ingest_total',
|
||||
help: 'Total number of metrics ingested',
|
||||
name: 'utapi_ingest_shard_task_ingest_total',
|
||||
help: 'Number of metrics ingested',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const ingestedSlow = new promClient.Counter({
|
||||
name: 's3_utapi_ingest_shard_task_slow_total',
|
||||
help: 'Total number of slow metrics ingested',
|
||||
name: 'utapi_ingest_shard_task_slow_total',
|
||||
help: 'Number of slow metrics ingested',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const ingestedShards = new promClient.Counter({
|
||||
name: 's3_utapi_ingest_shard_task_shard_ingest_total',
|
||||
help: 'Total number of metric shards ingested',
|
||||
name: 'utapi_ingest_shard_task_shard_ingest_total',
|
||||
help: 'Number of metric shards ingested',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
const shardAgeTotal = new promClient.Counter({
|
||||
name: 's3_utapi_ingest_shard_task_shard_age_total',
|
||||
help: 'Total aggregated age of shards',
|
||||
name: 'utapi_ingest_shard_task_shard_age_total',
|
||||
help: 'Aggregated age of shards',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
|
|
|
@ -31,12 +31,6 @@ class ReindexTask extends BaseTask {
|
|||
this._defaultLag = 0;
|
||||
const eventFilters = (config && config.filter) || {};
|
||||
this._shouldReindex = buildFilterChain((config && config.filter) || {});
|
||||
// exponential backoff: max wait = 50 * 2 ^ 10 milliseconds ~= 51 seconds
|
||||
this.ebConfig = {
|
||||
times: 10,
|
||||
interval: retryCount => 50 * (2 ** retryCount),
|
||||
};
|
||||
|
||||
if (Object.keys(eventFilters).length !== 0) {
|
||||
logEventFilter((...args) => logger.info(...args), 'reindex resource filtering enabled', eventFilters);
|
||||
}
|
||||
|
@ -164,6 +158,7 @@ class ReindexTask extends BaseTask {
|
|||
if (this._program.bucket.length) {
|
||||
return this._program.bucket.map(name => ({ name }));
|
||||
}
|
||||
|
||||
return metadata.listBuckets();
|
||||
}
|
||||
|
||||
|
@ -185,8 +180,8 @@ class ReindexTask extends BaseTask {
|
|||
let mpuTotal;
|
||||
|
||||
try {
|
||||
bktTotal = await async.retryable(this.ebConfig, ReindexTask._indexBucket)(bucket.name);
|
||||
mpuTotal = await async.retryable(this.ebConfig, ReindexTask._indexMpuBucket)(mpuBucket);
|
||||
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
|
||||
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
'failed bucket reindex. any associated account will be skipped',
|
||||
|
|
|
@ -24,8 +24,8 @@ class RepairTask extends BaseTask {
|
|||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const created = new promClient.Counter({
|
||||
name: 's3_utapi_repair_task_created_total',
|
||||
help: 'Total number of repair records created',
|
||||
name: 'utapi_repair_task_created_total',
|
||||
help: 'Number of repair records created',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
||||
|
|
|
@ -1,6 +1,14 @@
|
|||
const werelogs = require('werelogs');
|
||||
const config = require('../config');
|
||||
const { comprehend } = require('./func');
|
||||
|
||||
const loggerConfig = {
|
||||
level: config.logging.level,
|
||||
dump: config.logging.dumpLevel,
|
||||
};
|
||||
|
||||
werelogs.configure(loggerConfig);
|
||||
|
||||
const rootLogger = new werelogs.Logger('Utapi');
|
||||
|
||||
class LoggerContext {
|
||||
|
@ -70,6 +78,8 @@ class LoggerContext {
|
|||
}
|
||||
}
|
||||
|
||||
rootLogger.debug('logger initialized', { loggerConfig });
|
||||
|
||||
function buildRequestLogger(req) {
|
||||
let reqUids = [];
|
||||
if (req.headers['x-scal-request-uids'] !== undefined) {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const assert = require('assert');
|
||||
const { auth, policies } = require('arsenal');
|
||||
const vaultclient = require('vaultclient');
|
||||
const config = require('../config');
|
||||
const errors = require('../errors');
|
||||
/**
|
||||
|
@ -8,17 +9,9 @@ const errors = require('../errors');
|
|||
*/
|
||||
|
||||
class VaultWrapper extends auth.Vault {
|
||||
create(config) {
|
||||
if (config.vaultd.host) {
|
||||
return new VaultWrapper(config);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
constructor(options) {
|
||||
let client;
|
||||
const { host, port } = options.vaultd;
|
||||
const vaultclient = require('vaultclient');
|
||||
if (options.tls) {
|
||||
const { key, cert, ca } = options.tls;
|
||||
client = new vaultclient.Client(host, port, true, key, cert,
|
||||
|
@ -126,7 +119,7 @@ class VaultWrapper extends auth.Vault {
|
|||
}
|
||||
}
|
||||
|
||||
const vault = VaultWrapper.create(config);
|
||||
const vault = new VaultWrapper(config);
|
||||
auth.setHandler(vault);
|
||||
|
||||
module.exports = {
|
||||
|
|
18
package.json
18
package.json
|
@ -3,7 +3,7 @@
|
|||
"engines": {
|
||||
"node": ">=16"
|
||||
},
|
||||
"version": "8.1.15",
|
||||
"version": "7.10.7",
|
||||
"description": "API for tracking resource utilization and reporting metrics",
|
||||
"main": "index.js",
|
||||
"repository": {
|
||||
|
@ -19,12 +19,13 @@
|
|||
"dependencies": {
|
||||
"@hapi/joi": "^17.1.1",
|
||||
"@senx/warp10": "^1.0.14",
|
||||
"arsenal": "git+https://git.yourcmc.ru/vitalif/zenko-arsenal.git#development/8.1",
|
||||
"arsenal": "git+https://github.com/scality/Arsenal#7.10.29",
|
||||
"async": "^3.2.0",
|
||||
"aws-sdk": "^2.1005.0",
|
||||
"aws4": "^1.8.0",
|
||||
"backo": "^1.1.0",
|
||||
"body-parser": "^1.19.0",
|
||||
"bucketclient": "scality/bucketclient#7.10.4",
|
||||
"byte-size": "^7.0.0",
|
||||
"commander": "^5.1.0",
|
||||
"cron-parser": "^2.15.0",
|
||||
|
@ -37,16 +38,17 @@
|
|||
"needle": "^2.5.0",
|
||||
"node-schedule": "^1.3.2",
|
||||
"oas-tools": "^2.2.2",
|
||||
"prom-client": "14.2.0",
|
||||
"prom-client": "^13.1.0",
|
||||
"uuid": "^3.3.2",
|
||||
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1"
|
||||
"vaultclient": "scality/vaultclient#7.10.8",
|
||||
"werelogs": "scality/werelogs#8.1.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"eslint": "^8.14.0",
|
||||
"eslint-config-airbnb-base": "^15.0.0",
|
||||
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git",
|
||||
"eslint": "6.0.1",
|
||||
"eslint-config-airbnb": "17.1.0",
|
||||
"eslint-config-scality": "scality/Guidelines#7.10.2",
|
||||
"eslint-plugin-import": "^2.18.0",
|
||||
"mocha": ">=3.1.2",
|
||||
"mocha": "^3.0.2",
|
||||
"nodemon": "^2.0.4",
|
||||
"protobufjs": "^6.10.1",
|
||||
"sinon": "^9.0.2"
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
redis==5.0.3
|
||||
requests==2.31.0
|
|
@ -3,16 +3,17 @@ const assert = require('assert');
|
|||
const url = require('url');
|
||||
const { auth, errors, policies } = require('arsenal');
|
||||
const safeJsonParse = require('../utils/safeJsonParse');
|
||||
const Vault = require('../lib/Vault');
|
||||
|
||||
class Router {
|
||||
/**
|
||||
* @constructor
|
||||
* @param {Config} config - Config instance
|
||||
*/
|
||||
constructor(config, vault) {
|
||||
constructor(config) {
|
||||
this._service = config.component;
|
||||
this._routes = {};
|
||||
this._vault = vault;
|
||||
this._vault = new Vault(config);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -265,10 +266,6 @@ class Router {
|
|||
*/
|
||||
_processSecurityChecks(utapiRequest, route, cb) {
|
||||
const log = utapiRequest.getLog();
|
||||
if (process.env.UTAPI_AUTH === 'false') {
|
||||
// Zenko route request does not need to go through Vault
|
||||
return this._startRequest(utapiRequest, route, cb);
|
||||
}
|
||||
return this._authSquared(utapiRequest, err => {
|
||||
if (err) {
|
||||
log.trace('error from vault', { errors: err });
|
||||
|
|
21
server.js
21
server.js
|
@ -1,21 +1,4 @@
|
|||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
|
||||
const Config = require('./lib/Config');
|
||||
const config = require('./lib/Config');
|
||||
const server = require('./lib/server');
|
||||
|
||||
/*
|
||||
* By default, the config file is "config.json" at the root.
|
||||
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
|
||||
*/
|
||||
const cfgpath = process.env.UTAPI_CONFIG_FILE || (__dirname+'/config.json');
|
||||
|
||||
let cfg;
|
||||
try {
|
||||
cfg = JSON.parse(fs.readFileSync(cfgpath, { encoding: 'utf-8' }));
|
||||
} catch (err) {
|
||||
throw new Error(`could not parse config file: ${err.message}`);
|
||||
}
|
||||
|
||||
cfg.component = 's3';
|
||||
server(new Config(cfg));
|
||||
server(Object.assign({}, config, { component: 's3' }));
|
||||
|
|
|
@ -0,0 +1,448 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import concurrent.futures as futures
|
||||
import datetime
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import urllib
|
||||
from collections import namedtuple
|
||||
|
||||
import requests
|
||||
from requests import ConnectionError, HTTPError, Timeout
|
||||
|
||||
_log = logging.getLogger('utapi-svc-compute')
|
||||
_log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
USERS_BUCKET = 'users..bucket'
|
||||
|
||||
def _fatal_error(msg, rc=1):
|
||||
_log.error(msg)
|
||||
sys.exit(rc)
|
||||
|
||||
def path_type(string):
|
||||
return pathlib.Path(os.path.expanduser(string)).resolve()
|
||||
|
||||
def get_args():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog=pathlib.Path(sys.argv[0]).name,
|
||||
description='Compute service level metrics for Utapiv2',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
|
||||
|
||||
parser.add_argument('-c', '--config',
|
||||
default='/scality/ssd01/s3/scality-utapi/conf/config.json',
|
||||
type=path_type,
|
||||
help='Specify an alternate config file')
|
||||
|
||||
parser.add_argument('-r', '--max-retries', default=2, type=int, help='Max retries before failing a request to a external service request')
|
||||
parser.add_argument('-p', '--parallel-queries', default=5, type=int, help='Max number of parallel queries to and warp 10')
|
||||
parser.add_argument('-j', '--json', action='store_true', help='Output raw reports in json format')
|
||||
parser.add_argument('--output', default='.', type=path_type, help='Write report to this directory')
|
||||
parser.add_argument('--debug', action='store_true', help='Enable debug level logging')
|
||||
parser.add_argument('--dry-run', action='store_true', help="Don't do any computation. Only validate and print the configuration.")
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
ScriptConfig = namedtuple('ScriptConfig', ['warp10', 'bucketd', 'vault'])
|
||||
Warp10Conf = namedtuple('Warp10Conf', ['host', 'port', 'nodeId', 'read_token'])
|
||||
|
||||
def get_config(args):
|
||||
if not args.config.exists():
|
||||
_fatal_error('Config file does not exist: {}'.format(args.config))
|
||||
with open(args.config) as f:
|
||||
try:
|
||||
utapi_conf = json.load(f)
|
||||
except Exception as e:
|
||||
_log.debug(e)
|
||||
_fatal_error('Error reading utapi config file at: {}'.format(args.config))
|
||||
|
||||
try:
|
||||
read_token = utapi_conf['warp10']['readToken']
|
||||
write_token = utapi_conf['warp10']['writeToken']
|
||||
warp10_conf = [Warp10Conf(read_token=read_token, **server) for server in utapi_conf['warp10']['hosts']]
|
||||
except Exception as e:
|
||||
_log.debug(e)
|
||||
_fatal_error('Utapi config does not contain a valid "warp10" section')
|
||||
|
||||
try:
|
||||
bucketd_conf = utapi_conf['bucketd'][0]
|
||||
except Exception as e:
|
||||
_log.debug(e)
|
||||
_fatal_error('Utapi config does not contain a valid "bucketd" section')
|
||||
|
||||
try:
|
||||
vault_host = utapi_conf['vaultd']['host']
|
||||
vault_port = utapi_conf['vaultd']['port']
|
||||
vault_addr = 'http://{}:{}'.format(vault_host, vault_port)
|
||||
except Exception as e:
|
||||
_log.debug(e)
|
||||
_fatal_error('Utapi config does not contain a valid "vaultd" section')
|
||||
|
||||
return ScriptConfig(warp10=warp10_conf, bucketd=bucketd_conf, vault=vault_addr)
|
||||
|
||||
Bucket = namedtuple('Bucket', ['account', 'name'])
|
||||
class MaxRetriesReached(Exception):
|
||||
def __init__(self, url):
|
||||
super().__init__('Max retries reached for request to %s'%url)
|
||||
|
||||
class InvalidListing(Exception):
|
||||
def __init__(self, bucket):
|
||||
super().__init__('Invalid contents found while listing bucket %s'%bucket)
|
||||
|
||||
class BucketDClient:
|
||||
|
||||
'''Performs Listing calls against bucketd'''
|
||||
__url_format = 'http://{addr}/default/bucket/{bucket}'
|
||||
__headers = {'x-scal-request-uids': 'utapi-compute-service-lvl'}
|
||||
|
||||
def __init__(self, bucketd_addr=None, max_retries=2):
|
||||
self._bucketd_addr = bucketd_addr
|
||||
self._max_retries = max_retries
|
||||
self._session = requests.Session()
|
||||
|
||||
def _do_req(self, url, check_500=True, **kwargs):
|
||||
# Add 1 for the initial request
|
||||
for x in range(self._max_retries + 1):
|
||||
try:
|
||||
resp = self._session.get(url, timeout=30, verify=False, headers=self.__headers, **kwargs)
|
||||
if check_500 and resp.status_code == 500:
|
||||
_log.warning('500 from bucketd, sleeping 15 secs')
|
||||
time.sleep(15)
|
||||
continue
|
||||
return resp
|
||||
except (Timeout, ConnectionError) as e:
|
||||
_log.exception(e)
|
||||
_log.error('Error during listing, sleeping 5 secs %s'%url)
|
||||
time.sleep(5)
|
||||
|
||||
raise MaxRetriesReached(url)
|
||||
|
||||
def _list_bucket(self, bucket, **kwargs):
|
||||
'''
|
||||
Lists a bucket lazily until "empty"
|
||||
bucket: name of the bucket
|
||||
kwargs: url parameters key=value
|
||||
|
||||
To support multiple next marker keys and param encoding, a function can
|
||||
be passed as a parameters value. It will be call with the json decode
|
||||
response body as its only argument and is expected to return the
|
||||
parameters value. On the first request the function will be called with
|
||||
`None` and should return its initial value. Return `None` for the param to be excluded.
|
||||
'''
|
||||
url = self.__url_format.format(addr=self._bucketd_addr, bucket=bucket)
|
||||
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
|
||||
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
|
||||
is_truncated = True # Set to True for first loop
|
||||
payload = None
|
||||
while is_truncated:
|
||||
params = static_params.copy() # Use a copy of the static params for a base
|
||||
for key, func in dynamic_params.items():
|
||||
params[key] = func(payload) # Call each of our dynamic params with the previous payload
|
||||
try:
|
||||
_log.debug('listing bucket bucket: %s params: %s'%(
|
||||
bucket, ', '.join('%s=%s'%p for p in params.items())))
|
||||
resp = self._do_req(url, params=params)
|
||||
if resp.status_code == 200:
|
||||
payload = resp.json()
|
||||
except ValueError as e:
|
||||
_log.exception(e)
|
||||
_log.error('Invalid listing response body! bucket:%s params:%s'%(
|
||||
bucket, ', '.join('%s=%s'%p for p in params.items())))
|
||||
raise
|
||||
except MaxRetriesReached:
|
||||
_log.error('Max retries reached listing bucket:%s'%bucket)
|
||||
raise
|
||||
except Exception as e:
|
||||
_log.exception(e)
|
||||
_log.error('Unhandled exception during listing! bucket:%s params:%s'%(
|
||||
bucket, ', '.join('%s=%s'%p for p in params.items())))
|
||||
raise
|
||||
yield resp.status_code, payload
|
||||
if isinstance(payload, dict):
|
||||
is_truncated = payload.get('IsTruncated', False)
|
||||
else:
|
||||
is_truncated = len(payload) > 0
|
||||
|
||||
def list_buckets(self):
|
||||
|
||||
def get_next_marker(p):
|
||||
if p is None:
|
||||
return ''
|
||||
return p.get('Contents', [{}])[-1].get('key', '')
|
||||
|
||||
params = {
|
||||
'delimiter': '',
|
||||
'maxKeys': 1000,
|
||||
'marker': get_next_marker
|
||||
}
|
||||
for _, payload in self._list_bucket(USERS_BUCKET, **params):
|
||||
buckets = []
|
||||
for result in payload['Contents']:
|
||||
match = re.match('(\w+)..\|..(\w+.*)', result['key'])
|
||||
bucket = Bucket(*match.groups())
|
||||
buckets.append(bucket)
|
||||
if buckets:
|
||||
yield buckets
|
||||
|
||||
def query_warp10(url, payload, retries=5):
|
||||
for i in range(1, retries + 1):
|
||||
try:
|
||||
resp = requests.post(url, payload)
|
||||
if resp.status_code != 200:
|
||||
_log.error('Error fetching metrics from warp 10')
|
||||
if hasattr(resp, 'text'):
|
||||
_log.debug(resp.text)
|
||||
continue
|
||||
data = resp.json()
|
||||
num_objects = data[0].get('objD')
|
||||
bytes_stored = data[0].get('sizeD')
|
||||
return num_objects, bytes_stored
|
||||
except Exception as e:
|
||||
_log.exception('Error during warp 10 request', e)
|
||||
continue
|
||||
raise MaxRetriesReached(url)
|
||||
|
||||
def get_metrics(warp10s, bucket, timestamp, retries=5):
|
||||
num_objects = 0
|
||||
bytes_stored = 0
|
||||
for server in warp10s:
|
||||
auth = json.dumps({ 'read': server.read_token })
|
||||
op_info = json.dumps({
|
||||
'end': timestamp,
|
||||
'labels': { 'bck': bucket.name },
|
||||
'node': server.nodeId
|
||||
})
|
||||
payload = "'{}' '{}' @utapi/getMetricsAt".format(auth, op_info).encode('utf-8')
|
||||
url = 'http://{}:{}/api/v0/exec'.format(server.host, server.port)
|
||||
try:
|
||||
node_num_objects, node_bytes_stored = query_warp10(url, payload)
|
||||
num_objects += node_num_objects
|
||||
bytes_stored += node_bytes_stored
|
||||
except MaxRetriesReached as e:
|
||||
_log.exception('Error fetching metrics for bucket {} from {}'.format(bucket.name, server.nodeId), e)
|
||||
raise e
|
||||
return num_objects, bytes_stored
|
||||
|
||||
def get_account_data(vault, canon_ids):
|
||||
_log.debug('Fetching account info for canonicalId {}'.format(canon_ids) )
|
||||
payload = {
|
||||
'Action': 'GetAccounts',
|
||||
'Version': '2010-05-08',
|
||||
'canonicalIds': canon_ids
|
||||
}
|
||||
resp = requests.get(vault, params=payload)
|
||||
return resp.json()
|
||||
|
||||
|
||||
def chunker(iterable, chunksize):
|
||||
_iterable = iter(iterable)
|
||||
while True:
|
||||
chunk = itertools.islice(_iterable, chunksize)
|
||||
try:
|
||||
item = next(chunk)
|
||||
except StopIteration:
|
||||
return
|
||||
yield itertools.chain([item], chunk)
|
||||
|
||||
def print_config(config):
|
||||
print('Warp 10 Hosts - NodeId | Address')
|
||||
for host in config.warp10:
|
||||
print('{} | {}:{}'.format(host.nodeId, host.host, host.port))
|
||||
print('\nBucketD Host')
|
||||
print(config.bucketd)
|
||||
|
||||
html_header = '''<!DOCTYPE html>
|
||||
<html>
|
||||
<body>
|
||||
<style>
|
||||
body {
|
||||
background: #E7DED9;
|
||||
color: #3C3431;
|
||||
}
|
||||
|
||||
table, th, td {
|
||||
border: 1px solid black;
|
||||
border-collapse: collapse;
|
||||
}
|
||||
|
||||
table {
|
||||
width: 75%;
|
||||
margin: 0.25em auto;
|
||||
}
|
||||
|
||||
thead {
|
||||
background: #E1C391;
|
||||
}
|
||||
|
||||
tbody {
|
||||
background: #FDF4E3;
|
||||
}
|
||||
|
||||
tbody > tr:hover {background-color: #D6EEEE;}
|
||||
|
||||
td {
|
||||
padding: 0.1em 0.5em;
|
||||
}
|
||||
|
||||
h2, h3 {
|
||||
margin: auto;
|
||||
width: 75%;
|
||||
font-weight: normal;
|
||||
display: block;
|
||||
}
|
||||
|
||||
</style>
|
||||
'''
|
||||
html_footer = '<span>Generated on {}</span>\n</body></html>'
|
||||
|
||||
_heading_tmpl = '<thead><tr>\n{}\n</tr></thead>\n'
|
||||
def get_heading_row(*args):
|
||||
filler = '\n'.join('<td>{}</td>'.format(a) for a in args)
|
||||
return _heading_tmpl.format(filler)
|
||||
|
||||
_data_tmpl = '<tr>\n{}\n</tr>\n'
|
||||
def get_data_row(*args):
|
||||
filler = '\n'.join('<td>{}</td>'.format(a) for a in args)
|
||||
return _data_tmpl.format(filler)
|
||||
|
||||
def get_table_lines(heading, data):
|
||||
yield '<table>\n'
|
||||
yield get_heading_row(*heading)
|
||||
yield '<tdata>\n'
|
||||
for row in data:
|
||||
yield get_data_row(*row)
|
||||
yield '</tdata>\n'
|
||||
yield '</table>\n'
|
||||
|
||||
def get_heading(text, size=2):
|
||||
return '<h{}>{}</h{}>\n'.format(size, text, size)
|
||||
|
||||
_bucket_reports = {}
|
||||
_account_reports = {}
|
||||
_service_report = { 'obj_count': 0, 'bytes_stored': 0 }
|
||||
# Create a report for the given bucket
|
||||
# Create/Update the report for the bucket's account canonicalId
|
||||
# Update the service report
|
||||
def update_report(bucket, obj_count, bytes_stored):
|
||||
if bucket.account not in _bucket_reports:
|
||||
_bucket_reports[bucket.account] = dict()
|
||||
_bucket_reports[bucket.account][bucket.name] = { 'obj_count': obj_count, 'bytes_stored': bytes_stored }
|
||||
if bucket.account not in _account_reports:
|
||||
_account_reports[bucket.account] = { 'obj_count': obj_count, 'bytes_stored': bytes_stored }
|
||||
else:
|
||||
existing = _account_reports[bucket.account]
|
||||
_account_reports[bucket.account] = {
|
||||
'obj_count': existing['obj_count'] + obj_count,
|
||||
'bytes_stored': existing['bytes_stored'] + bytes_stored
|
||||
}
|
||||
_service_report['obj_count'] = _service_report['obj_count'] + obj_count
|
||||
_service_report['bytes_stored'] = _service_report['bytes_stored'] + bytes_stored
|
||||
|
||||
def to_human(bytes_stored):
|
||||
for unit in ['B', 'KiB', 'MiB', 'GiB', 'TiB']:
|
||||
if abs(bytes_stored) < 1024.0:
|
||||
return '{0:3.2f}{1}'.format(bytes_stored, unit)
|
||||
bytes_stored /= 1024.0
|
||||
return "{0:.2f}PiB".format(bytes_stored)
|
||||
|
||||
BucketReport = namedtuple('BucketContents', ['name', 'obj_count', 'bytes_stored', 'human'])
|
||||
AccountReport = namedtuple('AccountReport', ['name', 'arn', 'obj_count', 'bytes_stored', 'human'])
|
||||
ServiceReport = namedtuple('ServiceReport', ['obj_count', 'bytes_stored', 'human'])
|
||||
|
||||
def get_service_report():
|
||||
return ServiceReport(human=to_human(_service_report['bytes_stored']), **_service_report)
|
||||
|
||||
def get_account_reports(account_info):
|
||||
for canonical_id, counters in _account_reports.items():
|
||||
name = account_info[canonical_id]['name']
|
||||
arn = account_info[canonical_id]['arn']
|
||||
human_size = to_human(counters['bytes_stored'])
|
||||
yield AccountReport(name=name, arn=arn, human=human_size, **counters)
|
||||
|
||||
def get_bucket_reports(canonical_id):
|
||||
for name, counters in _bucket_reports[canonical_id].items():
|
||||
human_size = to_human(counters['bytes_stored'])
|
||||
yield BucketReport(name=name, human=human_size, **counters)
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = get_args()
|
||||
config = get_config(args)
|
||||
if args.debug:
|
||||
logging.basicConfig(level=logging.DEBUG, format=_log_format)
|
||||
else:
|
||||
logging.basicConfig(level=logging.INFO, format=_log_format)
|
||||
|
||||
if args.dry_run:
|
||||
print_config(config)
|
||||
print('\nOutput Path\n{}'.format(args.output[0]))
|
||||
sys.exit(0)
|
||||
|
||||
bucket_client = BucketDClient(config.bucketd, args.max_retries)
|
||||
|
||||
generation_timestamp = datetime.datetime.utcnow()
|
||||
microtimestamp = int(generation_timestamp.timestamp()) * 1000000
|
||||
account_info = {}
|
||||
|
||||
_log.info('Starting report computation')
|
||||
|
||||
# Take the buckets from a listing response from bucketd and submit it to the process pool
|
||||
# As the jobs are completed update the reports
|
||||
# All buckets are processed before bucketed is queried again
|
||||
failed_accounts = set()
|
||||
with futures.ProcessPoolExecutor(args.parallel_queries) as executor:
|
||||
for batch in bucket_client.list_buckets():
|
||||
jobs = { executor.submit(get_metrics, config.warp10, bucket, microtimestamp, args.max_retries): bucket for bucket in batch }
|
||||
for job in futures.as_completed(jobs.keys()):
|
||||
bucket = jobs[job]
|
||||
try:
|
||||
num_objects, bytes_stored = job.result()
|
||||
except Exception as e:
|
||||
_log.exception('Error fetching metrics for bucket {}'.format(bucket.name))
|
||||
else:
|
||||
_log.info('Updating report for bucket {}'.format(bucket.name))
|
||||
update_report(bucket, num_objects, bytes_stored)
|
||||
# Account information is fetched lazily on first encounter
|
||||
if bucket.account not in account_info:
|
||||
try:
|
||||
account_info[bucket.account] = get_account_data(config.vault, [bucket.account])[0]
|
||||
except Exception as e:
|
||||
_log.exception('Failed to fetch account information for canonicalId {}'.format(bucket.name))
|
||||
_log.error('Report will not include name and arn for account.')
|
||||
|
||||
ext = 'json' if args.json else 'html'
|
||||
output_path = args.output.joinpath('utapi-service-report-{}.{}'.format(generation_timestamp.strftime('%Y-%m-%dT%H-%M-%S'), ext))
|
||||
with open(output_path, 'w') as f:
|
||||
if args.json:
|
||||
_log.debug('writing json report')
|
||||
json.dump({
|
||||
'service': get_service_report()._asdict(),
|
||||
'account': [r._asdict() for r in get_account_reports(account_info)],
|
||||
'bucket': {account_info[cid]['arn']: [r._asdict() for r in get_bucket_reports(cid)] for cid in account_info.keys() }
|
||||
}, f)
|
||||
else:
|
||||
_log.debug('writing html report')
|
||||
f.write(html_header)
|
||||
f.write(get_heading('Service Totals'))
|
||||
for line in get_table_lines(['Total Object Count', 'Total Bytes Stored', 'Human Readable Size'], [get_service_report()]):
|
||||
f.write(line)
|
||||
f.write('<br>')
|
||||
|
||||
f.write(get_heading('Breakdown by Account'))
|
||||
for line in get_table_lines(['Account', 'Arn', 'Objects Count', 'Bytes Stored', 'Human Readable Size'], get_account_reports(account_info)):
|
||||
f.write(line)
|
||||
f.write('<br>')
|
||||
|
||||
f.write(get_heading('Breakdown by Bucket'))
|
||||
for canonical_id, acc_info in account_info.items():
|
||||
f.write(get_heading('Account: {}'.format(acc_info['name']), 3))
|
||||
for line in get_table_lines(['Bucket', 'Object Count', 'Bytes Stored', 'Human Readable Size'], get_bucket_reports(canonical_id)):
|
||||
f.write(line)
|
||||
f.write('<br>')
|
||||
f.write(html_footer.format(generation_timestamp.isoformat()))
|
||||
_log.info('Finished generating report')
|
|
@ -41,7 +41,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
|
|||
assert(lockSpy.calledOnce);
|
||||
assert(unlockSpy.notCalled);
|
||||
assert(execStub.calledOnce);
|
||||
await assertMetricValue('s3_utapi_monitor_disk_usage_hard_limit_bytes', 1);
|
||||
await assertMetricValue('utapi_monitor_disk_usage_hard_limit_bytes', 1);
|
||||
});
|
||||
|
||||
it('should trigger a database unlock if below the limit', async () => {
|
||||
|
|
|
@ -15,7 +15,7 @@ class CustomTask extends BaseTask {
|
|||
// eslint-disable-next-line class-methods-use-this
|
||||
_registerMetricHandlers() {
|
||||
const foo = new promClient.Gauge({
|
||||
name: 's3_utapi_custom_task_foo_total',
|
||||
name: 'utapi_custom_task_foo_total',
|
||||
help: 'Count of foos',
|
||||
labelNames: ['origin', 'containerName'],
|
||||
});
|
||||
|
@ -58,26 +58,26 @@ describe('Test BaseTask metrics', () => {
|
|||
|
||||
it('should push metrics for a task execution', async () => {
|
||||
await task.execute();
|
||||
const timeValues = await getMetricValues('s3_utapi_custom_task_duration_seconds');
|
||||
const timeValues = await getMetricValues('utapi_custom_task_duration_seconds');
|
||||
assert.strictEqual(timeValues.length, 1);
|
||||
|
||||
const attemptsValues = await getMetricValues('s3_utapi_custom_task_attempts_total');
|
||||
const attemptsValues = await getMetricValues('utapi_custom_task_attempts_total');
|
||||
assert.deepStrictEqual(attemptsValues, [{ value: 1, labels: {} }]);
|
||||
|
||||
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
|
||||
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
||||
assert.deepStrictEqual(failuresValues, []);
|
||||
});
|
||||
|
||||
it('should push metrics for a failed task execution', async () => {
|
||||
sinon.replace(task, '_execute', sinon.fake.rejects('forced failure'));
|
||||
await task.execute();
|
||||
const failuresValues = await getMetricValues('s3_utapi_custom_task_failures_total');
|
||||
const failuresValues = await getMetricValues('utapi_custom_task_failures_total');
|
||||
assert.deepStrictEqual(failuresValues, [{ value: 1, labels: {} }]);
|
||||
});
|
||||
|
||||
it('should allow custom handlers to be registered', async () => {
|
||||
await task.execute();
|
||||
const fooValues = await getMetricValues('s3_utapi_custom_task_foo_total');
|
||||
const fooValues = await getMetricValues('utapi_custom_task_foo_total');
|
||||
assert.deepStrictEqual(fooValues, [{ value: 1, labels: {} }]);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -80,7 +80,7 @@ describe('Test CreateCheckpoint', function () {
|
|||
|
||||
assert.strictEqual(series.length, 3);
|
||||
assertResults(totals, series);
|
||||
await assertMetricValue('s3_utapi_create_checkpoint_created_total', series.length);
|
||||
await assertMetricValue('utapi_create_checkpoint_created_total', series.length);
|
||||
});
|
||||
|
||||
it('should only include events not in an existing checkpoint', async () => {
|
||||
|
|
|
@ -88,7 +88,7 @@ describe('Test CreateSnapshot', function () {
|
|||
|
||||
assert.strictEqual(series.length, 3);
|
||||
assertResults(totals, series);
|
||||
await assertMetricValue('s3_utapi_create_snapshot_created_total', series.length);
|
||||
await assertMetricValue('utapi_create_snapshot_created_total', series.length);
|
||||
});
|
||||
|
||||
it('should create a snapshot from more than one checkpoint', async () => {
|
||||
|
|
|
@ -69,8 +69,8 @@ describe('Test MonitorDiskUsage', () => {
|
|||
const expectedTotalSize = expectedSingleSize * 2;
|
||||
assert.strictEqual(task.usage, expectedTotalSize);
|
||||
// Should equal the usage minus the empty datalog
|
||||
await assertMetricValue('s3_utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
|
||||
await assertMetricValue('s3_utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
|
||||
await assertMetricValue('utapi_monitor_disk_usage_leveldb_bytes', expectedSingleSize);
|
||||
await assertMetricValue('utapi_monitor_disk_usage_datalog_bytes', expectedSingleSize);
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -92,9 +92,9 @@ describe('Test IngestShards', function () {
|
|||
'@utapi/decodeEvent',
|
||||
);
|
||||
assertResults(events, series);
|
||||
await assertMetricValue('s3_utapi_ingest_shard_task_ingest_total', events.length);
|
||||
await assertMetricValue('s3_utapi_ingest_shard_task_shard_ingest_total', 1);
|
||||
const metricValues = await getMetricValues('s3_utapi_ingest_shard_task_shard_age_total');
|
||||
await assertMetricValue('utapi_ingest_shard_task_ingest_total', events.length);
|
||||
await assertMetricValue('utapi_ingest_shard_task_shard_ingest_total', 1);
|
||||
const metricValues = await getMetricValues('utapi_ingest_shard_task_shard_age_total');
|
||||
assert.strictEqual(metricValues.length, 1);
|
||||
const [metric] = metricValues;
|
||||
assert(metric.value > 0);
|
||||
|
@ -118,7 +118,7 @@ describe('Test IngestShards', function () {
|
|||
);
|
||||
|
||||
assertResults(events, series);
|
||||
await assertMetricValue('s3_utapi_ingest_shard_task_ingest_total', events.length);
|
||||
await assertMetricValue('utapi_ingest_shard_task_ingest_total', events.length);
|
||||
});
|
||||
|
||||
it('should ingest old metrics as repair', async () => {
|
||||
|
@ -138,7 +138,7 @@ describe('Test IngestShards', function () {
|
|||
'@utapi/decodeEvent',
|
||||
);
|
||||
assertResults(events, series);
|
||||
await assertMetricValue('s3_utapi_ingest_shard_task_slow_total', events.length);
|
||||
await assertMetricValue('utapi_ingest_shard_task_slow_total', events.length);
|
||||
});
|
||||
|
||||
it('should strip the event uuid during ingestion', async () => {
|
||||
|
@ -170,6 +170,7 @@ describe('Test IngestShards', function () {
|
|||
const results = await warp10.fetch({
|
||||
className: 'utapi.event', labels: { node: prefix }, start: start + 1, stop: -2,
|
||||
});
|
||||
|
||||
const series = JSON.parse(results.result[0])[0];
|
||||
const timestamps = series.v.map(ev => ev[0]);
|
||||
assert.deepStrictEqual([
|
||||
|
@ -178,8 +179,7 @@ describe('Test IngestShards', function () {
|
|||
], timestamps);
|
||||
});
|
||||
|
||||
// please unskip this in https://scality.atlassian.net/browse/UTAPI-65
|
||||
it.skip('should increment microseconds for several duplicate timestamps', async () => {
|
||||
it('should increment microseconds for several duplicate timestamps', async () => {
|
||||
const start = shardFromTimestamp(getTs(-120));
|
||||
const events = generateFakeEvents(start, start + 5, 5)
|
||||
.map(ev => { ev.timestamp = start; return ev; });
|
||||
|
@ -190,6 +190,7 @@ describe('Test IngestShards', function () {
|
|||
const results = await warp10.fetch({
|
||||
className: 'utapi.event', labels: { node: prefix }, start: start + 5, stop: -5,
|
||||
});
|
||||
|
||||
const series = JSON.parse(results.result[0])[0];
|
||||
const timestamps = series.v.map(ev => ev[0]);
|
||||
assert.deepStrictEqual([
|
||||
|
|
|
@ -177,49 +177,4 @@ describe('Test ReindexTask', function () {
|
|||
assert.strictEqual(series[0].values.length, 2);
|
||||
series[0].values.map(value => assert.deepStrictEqual(value, bucketRecord));
|
||||
});
|
||||
|
||||
describe('exponential backoff', () => {
|
||||
it('should retry when bucketd is unreachable', done => {
|
||||
// disable bucketd to simulate downtime
|
||||
bucketd.end();
|
||||
|
||||
const bucketDStub = sinon.stub(bucketd, '_getBucketResponse');
|
||||
bucketDStub.onFirstCall().callsFake(
|
||||
// Once the timeout promise resolves, bucketd is able to be called.
|
||||
// If we make a call after 10 seconds, this shows that retries
|
||||
// have been occuring during bucketd downtime.
|
||||
() => {
|
||||
return {
|
||||
key: 'foo',
|
||||
value: 'bar',
|
||||
};
|
||||
},
|
||||
);
|
||||
|
||||
const reindexPromise = new Promise((resolve, reject) => {
|
||||
reindexTask._execute()
|
||||
.then(() => {
|
||||
resolve('reindexed');
|
||||
})
|
||||
.catch(err => {
|
||||
reject(err);
|
||||
});
|
||||
});
|
||||
|
||||
const timeoutPromise = new Promise(resolve => {
|
||||
const f = () => {
|
||||
bucketd.start();
|
||||
resolve();
|
||||
};
|
||||
setTimeout(f, 10000);
|
||||
});
|
||||
|
||||
Promise.all([reindexPromise, timeoutPromise])
|
||||
.then(values => {
|
||||
assert.strictEqual(values[0], 'reindexed');
|
||||
sinon.restore();
|
||||
done();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -80,7 +80,7 @@ describe('Test Repair', function () {
|
|||
|
||||
assert.strictEqual(series.length, 3);
|
||||
assertResults(totals, series);
|
||||
await assertMetricValue('s3_utapi_repair_task_created_total', series.length);
|
||||
await assertMetricValue('utapi_repair_task_created_total', series.length);
|
||||
});
|
||||
|
||||
it('should only include events not in an existing correction', async () => {
|
||||
|
|
|
@ -233,8 +233,7 @@ describe('Test UtapiClient', function () {
|
|||
});
|
||||
});
|
||||
|
||||
// please unskip this in https://scality.atlassian.net/browse/UTAPI-65
|
||||
it.skip('should get the current storage for an account using the cache', async () => {
|
||||
it('should get the current storage for an account using the cache', async () => {
|
||||
await async.eachOf(totals.accounts, async (total, acc) => {
|
||||
cacheClient.updateAccountCounterBase(acc, total.bytes);
|
||||
});
|
||||
|
|
|
@ -39,7 +39,7 @@ function getMetricResponse(schemaKey) {
|
|||
return response;
|
||||
}
|
||||
|
||||
function assertMetrics(schemaKey, metricName, props, done) {
|
||||
function assertMetrics(schemaKey, metricName, props, isNegativeValue, done) {
|
||||
const timestamp = new Date().setMinutes(0, 0, 0);
|
||||
const timeRange = [timestamp, timestamp];
|
||||
const expectedRes = getMetricResponse(schemaKey);
|
||||
|
@ -51,6 +51,16 @@ function assertMetrics(schemaKey, metricName, props, done) {
|
|||
datastore,
|
||||
logger,
|
||||
(err, res) => {
|
||||
if (isNegativeValue) {
|
||||
assert(err.is.InternalError);
|
||||
assert.strictEqual(
|
||||
err.description,
|
||||
'Utapi is in a transient state for this time period as '
|
||||
+ 'metrics are being collected. Please try again in a few '
|
||||
+ 'minutes.',
|
||||
);
|
||||
return done();
|
||||
}
|
||||
assert.strictEqual(err, null);
|
||||
// overwrite operations metrics
|
||||
if (expectedResProps.operations) {
|
||||
|
@ -88,12 +98,13 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
|
|||
if (keyIndex === 'storageUtilized' || keyIndex === 'numberOfObjects') {
|
||||
key = generateStateKey(schemaObject, keyIndex);
|
||||
val = isNegativeValue ? -1024 : 1024;
|
||||
props[metricindex] = isNegativeValue ? [0, 0] : [val, val];
|
||||
props[metricindex] = [val, val];
|
||||
memBackend.zadd(key, timestamp, val, () =>
|
||||
assertMetrics(
|
||||
schemaKey,
|
||||
schemaObject[schemaKey],
|
||||
props,
|
||||
isNegativeValue,
|
||||
done,
|
||||
));
|
||||
} else if (keyIndex === 'incomingBytes' || keyIndex === 'outgoingBytes') {
|
||||
|
@ -105,6 +116,7 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
|
|||
schemaKey,
|
||||
schemaObject[schemaKey],
|
||||
props,
|
||||
isNegativeValue,
|
||||
done,
|
||||
));
|
||||
} else {
|
||||
|
@ -117,6 +129,7 @@ function testOps(schemaKey, keyIndex, metricindex, isNegativeValue, done) {
|
|||
schemaKey,
|
||||
schemaObject[schemaKey],
|
||||
props,
|
||||
isNegativeValue,
|
||||
done,
|
||||
));
|
||||
}
|
||||
|
@ -132,6 +145,7 @@ Object.keys(metricLevels).forEach(schemaKey => {
|
|||
schemaKey,
|
||||
resourceNames[schemaKey],
|
||||
null,
|
||||
false,
|
||||
done,
|
||||
));
|
||||
|
||||
|
|
|
@ -21,9 +21,6 @@ const config = {
|
|||
localCache: redisLocal,
|
||||
component: 's3',
|
||||
};
|
||||
const location = 'foo-backend';
|
||||
const incrby = 100;
|
||||
const decrby = -30;
|
||||
|
||||
function isSortedSetKey(key) {
|
||||
return key.endsWith('storageUtilized') || key.endsWith('numberOfObjects');
|
||||
|
@ -79,29 +76,6 @@ function setMockData(data, timestamp, cb) {
|
|||
return cb();
|
||||
}
|
||||
|
||||
function getLocationObject(bytesValue) {
|
||||
const obj = {};
|
||||
obj[`s3:location:${location}:locationStorage`] = `${bytesValue}`;
|
||||
return obj;
|
||||
}
|
||||
|
||||
function testLocationMetric(c, params, expected, cb) {
|
||||
const { location, updateSize } = params;
|
||||
if (updateSize) {
|
||||
c.pushLocationMetric(location, updateSize, REQUID, err => {
|
||||
assert.equal(err, null);
|
||||
assert.deepStrictEqual(memoryBackend.data, expected);
|
||||
return cb();
|
||||
});
|
||||
} else {
|
||||
c.getLocationMetric(location, REQUID, (err, bytesStored) => {
|
||||
assert.equal(err, null);
|
||||
assert.strictEqual(bytesStored, expected);
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
describe('UtapiClient:: enable/disable client', () => {
|
||||
it('should disable client when no redis config is provided', () => {
|
||||
const c = new UtapiClient();
|
||||
|
@ -273,11 +247,7 @@ tests.forEach(test => {
|
|||
c.setDataStore(ds);
|
||||
c.pushMetric(metric, REQUID, params, () => {
|
||||
deserializeMemoryBackend(memoryBackend.data);
|
||||
Object.keys(expected).forEach(key => {
|
||||
if (memoryBackend.data[key]) {
|
||||
assert.deepStrictEqual(memoryBackend.data[key], expected[key]);
|
||||
}
|
||||
});
|
||||
assert.deepStrictEqual(memoryBackend.data, expected);
|
||||
return cb();
|
||||
});
|
||||
}
|
||||
|
@ -520,7 +490,6 @@ tests.forEach(test => {
|
|||
storageUtilized: '1024',
|
||||
numberOfObjects: '1',
|
||||
};
|
||||
|
||||
setMockData(data, timestamp, () => {
|
||||
testMetric('deleteObject', params, expected, done);
|
||||
});
|
||||
|
@ -698,40 +667,6 @@ tests.forEach(test => {
|
|||
testMetric('putDeleteMarkerObject', metricTypes, expected, done);
|
||||
});
|
||||
|
||||
it('should push putDeleteMarkerObject metrics and have correct bytes and number of objects', done => {
|
||||
const expected = buildExpectedResult({
|
||||
action: 'PutObject',
|
||||
numberOfObjects: '1',
|
||||
});
|
||||
const metrics = {
|
||||
bucket: '5741-repro',
|
||||
keys: ['foo2'],
|
||||
byteLength: undefined,
|
||||
newByteLength: 258,
|
||||
oldByteLength: null,
|
||||
numberOfObjects: 1,
|
||||
accountId: '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
|
||||
userId: undefined,
|
||||
};
|
||||
testMetric('putObject', Object.assign(metrics, metricTypes), expected, () => {
|
||||
const expected = buildExpectedResult({
|
||||
action: 'DeleteObject',
|
||||
numberOfObjects: '1',
|
||||
});
|
||||
const metrics2 = {
|
||||
bucket: '5741-repro',
|
||||
keys: ['foo2'],
|
||||
byteLength: 258,
|
||||
newByteLength: undefined,
|
||||
oldByteLength: undefined,
|
||||
numberOfObjects: undefined,
|
||||
accountId: '79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be',
|
||||
userId: undefined,
|
||||
};
|
||||
testMetric('putDeleteMarkerObject', Object.assign(metrics2, metricTypes), expected, done);
|
||||
});
|
||||
});
|
||||
|
||||
it('should push putBucketReplication metrics', done => {
|
||||
const expected = buildExpectedResult({
|
||||
action: 'PutBucketReplication',
|
||||
|
@ -831,27 +766,3 @@ tests.forEach(test => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('UtapiClient:: location quota metrics', () => {
|
||||
beforeEach(function beFn() {
|
||||
this.currentTest.c = new UtapiClient(config);
|
||||
this.currentTest.c.setDataStore(ds);
|
||||
});
|
||||
|
||||
afterEach(() => memoryBackend.flushDb());
|
||||
|
||||
it('should increment location metric', function itFn(done) {
|
||||
const expected = getLocationObject(incrby);
|
||||
testLocationMetric(this.test.c, { location, updateSize: incrby },
|
||||
expected, done);
|
||||
});
|
||||
it('should decrement location metric', function itFn(done) {
|
||||
const expected = getLocationObject(decrby);
|
||||
testLocationMetric(this.test.c, { location, updateSize: decrby },
|
||||
expected, done);
|
||||
});
|
||||
it('should list location metric', function itFn(done) {
|
||||
const expected = 0;
|
||||
testLocationMetric(this.test.c, { location }, expected, done);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -114,21 +114,20 @@ describe('Test middleware', () => {
|
|||
|
||||
req.ctx = new RequestContext(req);
|
||||
middleware.httpMetricsMiddleware(req, resp);
|
||||
await assertMetricValue('s3_utapi_http_requests_total', 1);
|
||||
const durationMetric = 's3_utapi_http_request_duration_seconds';
|
||||
const duration = await getMetricValues(durationMetric);
|
||||
await assertMetricValue('utapi_http_requests_total', 1);
|
||||
const duration = await getMetricValues('utapi_http_request_duration_seconds');
|
||||
// 14 defined buckets + 1 for Infinity
|
||||
assert.strictEqual(
|
||||
duration.filter(metric => metric.metricName === `${durationMetric}_bucket`).length,
|
||||
duration.filter(metric => metric.metricName === 'utapi_http_request_duration_seconds_bucket').length,
|
||||
15,
|
||||
);
|
||||
const count = duration.filter(metric => metric.metricName === `${durationMetric}_count`);
|
||||
const count = duration.filter(metric => metric.metricName === 'utapi_http_request_duration_seconds_count');
|
||||
assert.deepStrictEqual(count, [{
|
||||
labels: {
|
||||
action: 'listMetrics',
|
||||
code: 200,
|
||||
},
|
||||
metricName: `${durationMetric}_count`,
|
||||
metricName: 'utapi_http_request_duration_seconds_count',
|
||||
value: 1,
|
||||
}]);
|
||||
assert.strictEqual(count[0].value, 1);
|
||||
|
@ -138,7 +137,7 @@ describe('Test middleware', () => {
|
|||
const req = templateRequest();
|
||||
req.ctx = new RequestContext(req);
|
||||
middleware.httpMetricsMiddleware(req, resp);
|
||||
assert.rejects(() => getMetricValues('s3_utapi_http_requests_total'));
|
||||
assert.rejects(() => getMetricValues('utapi_http_requests_total'));
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -112,17 +112,6 @@ class BucketD {
|
|||
return body;
|
||||
}
|
||||
|
||||
_getBucketVersionResponse(bucketName) {
|
||||
const body = {
|
||||
CommonPrefixes: [],
|
||||
IsTruncated: false,
|
||||
Versions: (this._bucketContent[bucketName] || [])
|
||||
// patch in a versionId to more closely match the real response
|
||||
.map(entry => ({ ...entry, versionId: 'null' })),
|
||||
};
|
||||
return body;
|
||||
}
|
||||
|
||||
_getShadowBucketOverviewResponse(bucketName) {
|
||||
const mpus = (this._bucketContent[bucketName] || []).map(o => ({
|
||||
key: o.key,
|
||||
|
@ -148,8 +137,6 @@ class BucketD {
|
|||
|| req.query.listingType === 'Delimiter'
|
||||
) {
|
||||
req.body = this._getBucketResponse(bucketName);
|
||||
} else if (req.query.listingType === 'DelimiterVersions') {
|
||||
req.body = this._getBucketVersionResponse(bucketName);
|
||||
}
|
||||
|
||||
// v2 reindex uses `Basic` listing type for everything
|
||||
|
|
Loading…
Reference in New Issue