Compare commits

..

3 Commits

Author SHA1 Message Date
bbuchanan9 c6ab0ef31b squash: update Dockerfile to successfully build 2019-06-28 13:25:23 -07:00
bbuchanan9 c4ca6ab874 squash: use node 8 base image 2019-06-28 12:58:08 -07:00
bbuchanan9 11af70d2d5 squash: install git in Dockerfile 2019-06-28 12:56:02 -07:00
218 changed files with 1048 additions and 17942 deletions

View File

@ -1,3 +0,0 @@
node_modules/
**/node_modules/
.git

View File

@ -1,25 +1 @@
{ { "extends": "scality" }
"extends": "scality",
"env": {
"es6": true
},
"parserOptions": {
"ecmaVersion": 9
},
"rules": {
"no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off",
"import/extensions": 0,
"prefer-spread": 0,
"no-param-reassign": 0,
"array-callback-return": 0
},
"settings": {
"import/resolver": {
"node": {
"paths": ["/utapi/node_modules", "node_modules"]
}
}
}
}

View File

@ -1,14 +0,0 @@
# Creating this image for the CI as GitHub Actions
# is unable to overwrite the entrypoint
ARG REDIS_IMAGE="redis:latest"
FROM ${REDIS_IMAGE}
ENV REDIS_LISTEN_PORT 6380
ENV REDIS_MASTER_HOST redis
ENV REDIS_MASTER_PORT_NUMBER 6379
ENTRYPOINT redis-server \
--port ${REDIS_LISTEN_PORT} \
--slaveof ${REDIS_MASTER_HOST} ${REDIS_MASTER_PORT_NUMBER}

View File

@ -1,7 +0,0 @@
FROM ghcr.io/scality/vault:c2607856
ENV VAULT_DB_BACKEND LEVELDB
RUN chmod 400 tests/utils/keyfile
ENTRYPOINT yarn start

View File

@ -1,22 +0,0 @@
#!/bin/bash
set -x
set -e -o pipefail
# port for utapi server
PORT=8100
trap killandsleep EXIT
killandsleep () {
kill -9 $(lsof -t -i:$PORT) || true
sleep 10
}
if [ -z "$SETUP_CMD" ]; then
SETUP_CMD="start"
fi
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "setup_$2.log" &
bash tests/utils/wait_for_local_port.bash $PORT 40
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "test_$2.log"

View File

@ -1,65 +0,0 @@
name: build-ci-images
on:
workflow_call:
jobs:
warp10-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: warp10-ci
context: .
file: images/warp10/Dockerfile
lfs: true
redis-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-ci
context: .
file: images/redis/Dockerfile
redis-replica-ci:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
needs:
- redis-ci
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
name: redis-replica-ci
context: .github/docker/redis-replica
build-args: |
REDIS_IMAGE=ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
vault-ci:
runs-on: ubuntu-20.04
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to GitHub Registry
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.repository_owner }}
password: ${{ github.token }}
- name: Build and push vault Image
uses: docker/build-push-action@v5
with:
push: true
context: .github/docker/vault
tags: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
cache-from: type=gha,scope=vault
cache-to: type=gha,mode=max,scope=vault

View File

@ -1,16 +0,0 @@
name: build-dev-image
on:
push:
branches-ignore:
- 'development/**'
jobs:
build-dev:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets:
REGISTRY_LOGIN: ${{ github.repository_owner }}
REGISTRY_PASSWORD: ${{ github.token }}
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}

View File

@ -1,39 +0,0 @@
name: release-warp10
on:
workflow_dispatch:
inputs:
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: true
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
secrets: inherit
with:
name: warp10
context: .
file: images/warp10/Dockerfile
tag: ${{ github.event.inputs.tag }}
lfs: true
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
name: Release utapi/warp10:${{ github.event.inputs.tag }}-warp10
tag_name: ${{ github.event.inputs.tag }}-warp10
generate_release_notes: false
target_commitish: ${{ github.sha }}

View File

@ -1,45 +0,0 @@
name: release
on:
workflow_dispatch:
inputs:
dockerfile:
description: Dockerfile to build image from
type: choice
options:
- images/nodesvc-base/Dockerfile
- Dockerfile
required: true
tag:
type: string
description: 'Tag to be released'
required: true
create-github-release:
type: boolean
description: Create a tag and matching Github release.
required: false
default: false
jobs:
build:
uses: scality/workflows/.github/workflows/docker-build.yaml@v2
with:
namespace: ${{ github.repository_owner }}
name: ${{ github.event.repository.name }}
context: .
file: ${{ github.event.inputs.dockerfile}}
tag: ${{ github.event.inputs.tag }}
release:
if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest
needs: build
steps:
- uses: softprops/action-gh-release@v2
env:
GITHUB_TOKEN: ${{ github.token }}
with:
name: Release ${{ github.event.inputs.tag }}
tag_name: ${{ github.event.inputs.tag }}
generate_release_notes: true
target_commitish: ${{ github.sha }}

View File

@ -1,361 +0,0 @@
---
name: tests
on:
push:
branches-ignore:
- 'development/**'
workflow_dispatch:
inputs:
debug:
description: Debug (enable the ability to SSH to runners)
type: boolean
required: false
default: 'false'
connection-timeout-m:
type: number
required: false
description: Timeout for ssh connection to worker (minutes)
default: 30
jobs:
build-ci:
uses: ./.github/workflows/build-ci.yaml
lint:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: run static analysis tools on markdown
run: yarn run lint_md
- name: run static analysis tools on code
run: yarn run lint
tests-v1:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
name: ${{ matrix.test.name }}
strategy:
fail-fast: false
matrix:
test:
- name: run unit tests
command: yarn test
env:
UTAPI_METRICS_ENABLED: 'true'
- name: run v1 client tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:client
env: {}
- name: run v1 server tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:server
env: {}
- name: run v1 cron tests
command: bash ./.github/scripts/run_ft_tests.bash false ft_test:cron
env: {}
- name: run v1 interval tests
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:interval
env: {}
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
ports:
- 4802:4802
- 8082:8082
- 9718:9718
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
tests-v2-with-vault:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ENABLE_SENSISION: 't'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 for 60 seconds
run: sleep 60
- name: run v2 functional tests
run: bash ./.github/scripts/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_SERVICE_USER_ENABLED: 'true'
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}
tests-v2-without-sensision:
needs:
- build-ci
runs-on: ubuntu-latest
env:
REINDEX_PYTHON_INTERPRETER: python3
name: ${{ matrix.test.name }}
strategy:
fail-fast: false
matrix:
test:
- name: run v2 soft limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:softLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
- name: run v2 hard limit test
command: bash ./.github/scripts/run_ft_tests.bash true ft_test:hardLimit
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
SETUP_CMD: "run start_v2:server"
services:
redis:
image: ghcr.io/${{ github.repository }}/redis-ci:${{ github.sha }}
ports:
- 6379:6379
- 9121:9121
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-replica:
image: ghcr.io/${{ github.repository }}/redis-replica-ci:${{ github.sha }}
ports:
- 6380:6380
options: >-
--health-cmd "redis-cli -p 6380 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
redis-sentinel:
image: bitnami/redis-sentinel:7.2.4
env:
REDIS_MASTER_SET: scality-s3
REDIS_SENTINEL_PORT_NUMBER: '16379'
REDIS_SENTINEL_QUORUM: '1'
ports:
- 16379:16379
options: >-
--health-cmd "redis-cli -p 16379 ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
warp10:
image: ghcr.io/${{ github.repository }}/warp10-ci:${{ github.sha }}
env:
standalone.port: '4802'
warpscript.maxops: '10000000'
ports:
- 4802:4802
- 8082:8082
- 9718:9718
options: >-
--health-cmd "curl localhost:4802/api/v0/check"
--health-interval 10s
--health-timeout 5s
--health-retries 10
--health-start-period 60s
vault:
image: ghcr.io/${{ github.repository }}/vault-ci:${{ github.sha }}
ports:
- 8500:8500
- 8600:8600
- 8700:8700
- 8800:8800
options: >-
--health-cmd "curl http://localhost:8500/_/healthcheck"
--health-interval 10s
--health-timeout 5s
--health-retries 10
steps:
- name: Checkout
uses: actions/checkout@v4
with:
lfs: true
- uses: actions/setup-node@v4
with:
node-version: '16.13.2'
cache: yarn
- uses: actions/setup-python@v5
with:
python-version: '3.9'
cache: pip
- name: Install python deps
run: pip install -r requirements.txt
- name: install dependencies
run: yarn install --frozen-lockfile --network-concurrency 1
- name: Wait for warp10 a little bit
run: sleep 60
- name: ${{ matrix.test.name }}
run: ${{ matrix.test.command }}
env: ${{ matrix.test.env }}
- name: 'Debug: SSH to runner'
uses: scality/actions/action-ssh-to-runner@1.7.0
timeout-minutes: ${{ fromJSON(github.event.inputs.connection-timeout-m) }}
continue-on-error: true
with:
tmate-server-host: ${{ secrets.TMATE_SERVER_HOST }}
tmate-server-port: ${{ secrets.TMATE_SERVER_PORT }}
tmate-server-rsa-fingerprint: ${{ secrets.TMATE_SERVER_RSA_FINGERPRINT }}
tmate-server-ed25519-fingerprint: ${{ secrets.TMATE_SERVER_ED25519_FINGERPRINT }}
if: ${{ ( github.event.inputs.debug == true || github.event.inputs.debug == 'true' ) }}

5
.gitignore vendored
View File

@ -1,7 +1,4 @@
dist dist
node_modules/ node_modules
**/node_modules/
logs logs
*.log *.log
dump.rdb
.vscode/

View File

@ -1,5 +0,0 @@
# .prettierrc or .prettierrc.yaml
trailingComma: "all"
tabWidth: 4
arrowParens: avoid
singleQuote: true

View File

@ -1,31 +1,25 @@
FROM node:16.13.2-buster-slim FROM node:8-slim
WORKDIR /usr/src/app WORKDIR /usr/src/app
COPY package.json yarn.lock /usr/src/app/ COPY package.json /usr/src/app
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y \ && apt-get install git -y \
curl \ && apt-get install -y jq --no-install-recommends \
gnupg2 && apt install build-essential -y \
&& apt-get install python -y \
RUN curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \ && apt-get install g++ -y \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list && npm install --production \
RUN apt-get update \
&& apt-get install -y jq git python3 build-essential yarn --no-install-recommends \
&& yarn cache clean \
&& yarn install --frozen-lockfile --production --ignore-optional --network-concurrency=1 \
&& apt-get autoremove --purge -y python3 git build-essential \
&& rm -rf /var/lib/apt/lists/* \ && rm -rf /var/lib/apt/lists/* \
&& yarn cache clean \ && npm cache clear --force \
&& rm -rf ~/.node-gyp \ && rm -rf ~/.node-gyp \
&& rm -rf /tmp/yarn-* && rm -rf /tmp/npm-*
# Keep the .git directory in order to properly report version # Keep the .git directory in order to properly report version
COPY . /usr/src/app COPY . /usr/src/app
ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"] ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
CMD [ "yarn", "start" ] CMD [ "npm", "start" ]
EXPOSE 8100 EXPOSE 8100

View File

@ -3,21 +3,14 @@
![Utapi logo](res/utapi-logo.png) ![Utapi logo](res/utapi-logo.png)
[![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi) [![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi)
[![Scality CI][badgepriv]](http://ci.ironmann.io/gh/scality/utapi)
Service Utilization API for tracking resource usage and metrics reporting. Service Utilization API for tracking resource usage and metrics reporting
## Design ## Design
Please refer to the [design](/DESIGN.md) for more information. Please refer to the [design](/DESIGN.md) for more information.
## Server
To run the server:
```
npm start
```
## Client ## Client
The module exposes a client, named UtapiClient. Projects can use this client to The module exposes a client, named UtapiClient. Projects can use this client to
@ -87,13 +80,13 @@ Server is running.
1. Create an IAM user 1. Create an IAM user
``` ```
aws iam --endpoint-url <endpoint> create-user --user-name <user-name> aws iam --endpoint-url <endpoint> create-user --user-name utapiuser
``` ```
2. Create access key for the user 2. Create access key for the user
``` ```
aws iam --endpoint-url <endpoint> create-access-key --user-name <user-name> aws iam --endpoint-url <endpoint> create-access-key --user-name utapiuser
``` ```
3. Define a managed IAM policy 3. Define a managed IAM policy
@ -202,11 +195,12 @@ Server is running.
5. Attach user to the managed policy 5. Attach user to the managed policy
``` ```
aws --endpoint-url <endpoint> iam attach-user-policy --user-name aws --endpoint-url <endpoint> iam attach-user-policy --user-name utapiuser
<user-name> --policy-arn <policy arn> --policy-arn <policy arn>
``` ```
Now the user has access to ListMetrics request in Utapi on all buckets. Now the user `utapiuser` has access to ListMetrics request in Utapi on all
buckets.
### Signing request with Auth V4 ### Signing request with Auth V4
@ -222,18 +216,16 @@ following urls for reference.
You may also view examples making a request with Auth V4 using various languages You may also view examples making a request with Auth V4 using various languages
and AWS SDKs [here](/examples). and AWS SDKs [here](/examples).
Alternatively, you can use a nifty command line tool available in Scality's Alternatively, you can use a nifty command line tool available in Scality's S3.
CloudServer.
You can git clone the CloudServer repo from here You can git clone S3 repo from here https://github.com/scality/S3.git and follow
https://github.com/scality/cloudserver and follow the instructions in the README the instructions in README to install the dependencies.
to install the dependencies.
If you have CloudServer running inside a docker container you can docker exec If you have S3 running inside a docker container you can docker exec into the S3
into the CloudServer container as container as
``` ```
docker exec -it <container-id> bash docker exec -it <container id> bash
``` ```
and then run the command and then run the command
@ -271,7 +263,7 @@ Usage: list_metrics [options]
-v, --verbose -v, --verbose
``` ```
An example call to list metrics for a bucket `demo` to Utapi in a https enabled A typical call to list metrics for a bucket `demo` to Utapi in a https enabled
deployment would be deployment would be
``` ```
@ -283,7 +275,7 @@ Both start and end times are time expressed as UNIX epoch timestamps **expressed
in milliseconds**. in milliseconds**.
Keep in mind, since Utapi metrics are normalized to the nearest 15 min. Keep in mind, since Utapi metrics are normalized to the nearest 15 min.
interval, start time and end time need to be in the specific format as follows. interval, so start time and end time need to be in specific format as follows.
#### Start time #### Start time
@ -297,7 +289,7 @@ Date: Tue Oct 11 2016 17:35:25 GMT-0700 (PDT)
Unix timestamp (milliseconds): 1476232525320 Unix timestamp (milliseconds): 1476232525320
Here's an example JS method to get a start timestamp Here's a typical JS method to get start timestamp
```javascript ```javascript
function getStartTimestamp(t) { function getStartTimestamp(t) {
@ -317,7 +309,7 @@ seconds and milliseconds set to 59 and 999 respectively. So valid end timestamps
would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and
`09:59:59:999`. `09:59:59:999`.
Here's an example JS method to get an end timestamp Here's a typical JS method to get end timestamp
```javascript ```javascript
function getEndTimestamp(t) { function getEndTimestamp(t) {
@ -342,3 +334,4 @@ In order to contribute, please follow the
https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md). https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md).
[badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg [badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg
[badgepriv]: http://ci.ironmann.io/gh/scality/utapi.svg?style=svg

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'CreateCheckpoint',
});
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting checkpoint creation'))
.then(() => task.start())
.then(() => logger.info('Checkpoint creation started'));

View File

@ -1,14 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'CreateSnapshot',
});
const task = new tasks.CreateSnapshot({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting snapshot creation'))
.then(() => task.start())
.then(() => logger.info('Snapshot creation started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'MonitorDiskUsage',
});
const task = new tasks.MonitorDiskUsage({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting disk usage monitor'))
.then(() => task.start())
.then(() => logger.info('Disk usage monitor started'));

View File

@ -1,276 +0,0 @@
#! /usr/bin/env node
// TODO
// - deduplicate with Vault's seed script at https://github.com/scality/Vault/pull/1627
// - add permission boundaries to user when https://scality.atlassian.net/browse/VAULT-4 is implemented
const { errors } = require('arsenal');
const program = require('commander');
const werelogs = require('werelogs');
const async = require('async');
const { IAM } = require('aws-sdk');
const { version } = require('../package.json');
const systemPrefix = '/scality-internal/';
function generateUserPolicyDocument() {
return {
Version: '2012-10-17',
Statement: {
Effect: 'Allow',
Action: 'utapi:ListMetrics',
Resource: 'arn:scality:utapi:::*/*',
},
};
}
function createIAMClient(opts) {
return new IAM({
endpoint: opts.iamEndpoint,
});
}
function needsCreation(v) {
if (Array.isArray(v)) {
return !v.length;
}
return !v;
}
class BaseHandler {
constructor(serviceName, iamClient, log) {
this.serviceName = serviceName;
this.iamClient = iamClient;
this.log = log;
}
applyWaterfall(values, done) {
this.log.debug('applyWaterfall', { values, type: this.resourceType });
const v = values[this.resourceType];
if (needsCreation(v)) {
this.log.debug('creating', { v, type: this.resourceType });
return this.create(values)
.then(res =>
done(null, Object.assign(values, {
[this.resourceType]: res,
})))
.catch(done);
}
this.log.debug('conflicts check', { v, type: this.resourceType });
if (this.conflicts(v)) {
return done(errors.EntityAlreadyExists.customizeDescription(
`${this.resourceType} ${this.serviceName} already exists and conflicts with the expected value.`));
}
this.log.debug('nothing to do', { v, type: this.resourceType });
return done(null, values);
}
}
class UserHandler extends BaseHandler {
get resourceType() {
return 'user';
}
collect() {
return this.iamClient.getUser({
UserName: this.serviceName,
})
.promise()
.then(res => res.User);
}
create(allResources) {
return this.iamClient.createUser({
UserName: this.serviceName,
Path: systemPrefix,
})
.promise()
.then(res => res.User);
}
conflicts(u) {
return u.Path !== systemPrefix;
}
}
class PolicyHandler extends BaseHandler {
get resourceType() {
return 'policy';
}
collect() {
return this.iamClient.listPolicies({
MaxItems: 100,
OnlyAttached: false,
Scope: 'All',
})
.promise()
.then(res => res.Policies.find(p => p.PolicyName === this.serviceName));
}
create(allResources) {
const doc = generateUserPolicyDocument();
return this.iamClient.createPolicy({
PolicyName: this.serviceName,
PolicyDocument: JSON.stringify(doc),
Path: systemPrefix,
})
.promise()
.then(res => res.Policy);
}
conflicts(p) {
return p.Path !== systemPrefix;
}
}
class PolicyAttachmentHandler extends BaseHandler {
get resourceType() {
return 'policyAttachment';
}
collect() {
return this.iamClient.listAttachedUserPolicies({
UserName: this.serviceName,
MaxItems: 100,
})
.promise()
.then(res => res.AttachedPolicies)
}
create(allResources) {
return this.iamClient.attachUserPolicy({
PolicyArn: allResources.policy.Arn,
UserName: this.serviceName,
})
.promise();
}
conflicts(p) {
return false;
}
}
class AccessKeyHandler extends BaseHandler {
get resourceType() {
return 'accessKey';
}
collect() {
return this.iamClient.listAccessKeys({
UserName: this.serviceName,
MaxItems: 100,
})
.promise()
.then(res => res.AccessKeyMetadata)
}
create(allResources) {
return this.iamClient.createAccessKey({
UserName: this.serviceName,
})
.promise()
.then(res => res.AccessKey);
}
conflicts(a) {
return false;
}
}
function collectResource(v, done) {
v.collect()
.then(res => done(null, res))
.catch(err => {
if (err.code === 'NoSuchEntity') {
return done(null, null);
}
done(err);
});
}
function collectResourcesFromHandlers(handlers, cb) {
const tasks = handlers.reduce((acc, v) => ({
[v.resourceType]: done => collectResource(v, done),
...acc,
}), {});
async.parallel(tasks, cb);
}
function buildServiceUserHandlers(serviceName, client, log) {
return [
UserHandler,
PolicyHandler,
PolicyAttachmentHandler,
AccessKeyHandler,
].map(h => new h(serviceName, client, log));
}
function apply(client, serviceName, log, cb) {
const handlers = buildServiceUserHandlers(serviceName, client, log);
async.waterfall([
done => collectResourcesFromHandlers(handlers, done),
...handlers.map(h => h.applyWaterfall.bind(h)),
(values, done) => done(null, values.accessKey),
], cb);
}
function wrapAction(actionFunc, serviceName, options) {
werelogs.configure({
level: options.logLevel,
dump: options.logDumpLevel,
});
const log = new werelogs.Logger(process.argv[1]).newRequestLogger();
const client = createIAMClient(options);
actionFunc(client, serviceName, log, (err, data) => {
if (err) {
log.error('failed', {
data,
error: err,
});
if (err.EntityAlreadyExists) {
log.error(`run "${process.argv[1]} purge ${serviceName}" to fix.`);
}
process.exit(1);
}
log.info('success', { data });
process.exit();
});
}
program.version(version);
[
{
name: 'apply <service-name>',
actionFunc: apply,
},
].forEach(cmd => {
program
.command(cmd.name)
.option('--iam-endpoint <url>', 'IAM endpoint', 'http://localhost:8600')
.option('--log-level <level>', 'log level', 'info')
.option('--log-dump-level <level>', 'log level that triggers a dump of the debug buffer', 'error')
.action(wrapAction.bind(null, cmd.actionFunc));
});
const validCommands = program.commands.map(n => n._name);
// Is the command given invalid or are there too few arguments passed
if (!validCommands.includes(process.argv[2])) {
program.outputHelp();
process.stdout.write('\n');
process.exit(1);
} else {
program.parse(process.argv);
}

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'IngestShard',
});
const task = new tasks.IngestShard({ warp10: warp10Clients });
task.setup()
.then(() => logger.info('Starting shard ingestion'))
.then(() => task.start())
.then(() => logger.info('Ingestion started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'ManualAdjust',
});
const task = new tasks.ManualAdjust({ warp10: warp10Clients });
task.setup()
.then(() => logger.info('Starting manual adjustment'))
.then(() => task.start())
.then(() => logger.info('Manual adjustment started'));

View File

@ -1,14 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Migrate',
});
const task = new tasks.MigrateTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting utapi v1 => v2 migration'))
.then(() => task.start())
.then(() => logger.info('Migration started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Reindex',
});
const task = new tasks.ReindexTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting Reindex daemon'))
.then(() => task.start())
.then(() => logger.info('Reindex started'));

View File

@ -1,15 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const logger = new LoggerContext({
task: 'Repair',
});
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
task.setup()
.then(() => logger.info('Starting Repair daemon'))
.then(() => task.start())
.then(() => logger.info('Repair started'));

View File

@ -1,9 +0,0 @@
const { startUtapiServer } = require('..');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({ module: 'entrypoint' });
startUtapiServer().then(
() => logger.info('utapi started'),
error => logger.error('Unhandled Error', { error }),
);

View File

@ -1,75 +0,0 @@
version: '3.8'
x-models:
warp10: &warp10
build:
context: .
dockerfile: ./images/warp10/Dockerfile
volumes: [ $PWD/warpscript:/usr/local/share/warpscript ]
warp10_env: &warp10_env
ENABLE_WARPSTUDIO: 'true'
ENABLE_SENSISION: 'true'
warpscript.repository.refresh: 1000
warpscript.maxops: 1000000000
warpscript.maxops.hard: 1000000000
warpscript.maxfetch: 1000000000
warpscript.maxfetch.hard: 1000000000
warpscript.extension.debug: io.warp10.script.ext.debug.DebugWarpScriptExtension
warpscript.maxrecursion: 1000
warpscript.repository.directory: /usr/local/share/warpscript
warpscript.extension.logEvent: io.warp10.script.ext.logging.LoggingWarpScriptExtension
redis: &redis
build:
context: .
dockerfile: ./images/redis/Dockerfile
services:
redis-0:
image: redis:7.2.4
command: redis-server --port 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6379:6379
environment:
- HOST_IP="${EXTERNAL_HOST}"
redis-1:
image: redis:7.2.4
command: redis-server --port 6380 --slaveof "${EXTERNAL_HOST}" 6379 --slave-announce-ip "${EXTERNAL_HOST}"
ports:
- 6380:6380
environment:
- HOST_IP="${EXTERNAL_HOST}"
redis-sentinel-0:
image: redis:7.2.4
command: |-
bash -c 'cat > /tmp/sentinel.conf <<EOF
port 16379
logfile ""
dir /tmp
sentinel announce-ip ${EXTERNAL_HOST}
sentinel announce-port 16379
sentinel monitor scality-s3 "${EXTERNAL_HOST}" 6379 1
EOF
redis-sentinel /tmp/sentinel.conf'
environment:
- HOST_IP="${EXTERNAL_HOST}"
ports:
- 16379:16379
warp10:
<< : *warp10
environment:
<< : *warp10_env
ports:
- 4802:4802
- 8081:8081
- 9718:9718
volumes:
- /tmp/warp10:/data
- '${PWD}/warpscript:/usr/local/share/warpscript'

View File

@ -1,42 +0,0 @@
# Utapi Release Plan
## Docker Image Generation
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
Utapi has one namespace there:
* Namespace: ghcr.io/scality/utapi
With every CI build, the CI will push images, tagging the
content with the developer branch's short SHA-1 commit hash.
This allows those images to be used by developers, CI builds,
build chain and so on.
Tagged versions of utapi will be stored in the production namespace.
## How to Pull Docker Images
```sh
docker pull ghcr.io/scality/utapi:<commit hash>
docker pull ghcr.io/scality/utapi:<tag>
```
## Release Process
To release a production image:
* Name the tag for the repository and Docker image.
* Use the `yarn version` command with the same tag to update `package.json`.
* Create a PR and merge the `package.json` change.
* Tag the repository using the same tag.
* [Force a build] using:
* A given branch that ideally matches the tag.
* The `release` stage.
* An extra property with the name `tag` and its value being the actual tag.
[Force a build]:
https://eve.devsca.com/github/scality/utapi/#/builders/bootstrap/force/force

39
eve/main.yml Normal file
View File

@ -0,0 +1,39 @@
---
version: 0.2
branches:
default:
stage: pre-merge
stages:
pre-merge:
worker:
type: docker
path: eve/workers/unit_and_feature_tests
volumes:
- '/home/eve/workspace'
steps:
- Git:
name: fetch source
repourl: '%(prop:git_reference)s'
shallow: True
retryFetch: True
haltOnFailure: True
- ShellCommand:
name: npm install
command: npm install
# - ShellCommand:
# name: get api node modules from cache
# command: mv /home/eve/node_reqs/node_modules .
- ShellCommand:
name: run static analysis tools on markdown
command: npm run lint_md
- ShellCommand:
name: run static analysis tools on code
command: npm run lint
- ShellCommand:
name: run unit tests
command: npm test
- ShellCommand:
name: run feature tests
command: npm run ft_test

View File

@ -0,0 +1,26 @@
FROM buildpack-deps:jessie-curl
#
# Install apt packages needed by backbeat and buildbot_worker
#
ENV LANG C.UTF-8
COPY utapi_packages.list buildbot_worker_packages.list /tmp/
RUN curl -sL https://deb.nodesource.com/setup_6.x | bash - \
&& apt-get update -qq \
&& cat /tmp/*packages.list | xargs apt-get install -y \
&& pip install pip==9.0.1 \
&& rm -rf /var/lib/apt/lists/* \
&& rm -f /tmp/*packages.list \
&& rm -f /etc/supervisor/conf.d/*.conf
#
# Run buildbot-worker on startup through supervisor
#
ARG BUILDBOT_VERSION
RUN pip install buildbot-worker==$BUILDBOT_VERSION
ADD supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
CMD ["supervisord", "-n"]

View File

@ -0,0 +1,9 @@
ca-certificates
git
libffi-dev
libssl-dev
python2.7
python2.7-dev
python-pip
sudo
supervisor

View File

@ -0,0 +1,9 @@
[program:buildbot_worker]
command=/bin/sh -c 'buildbot-worker create-worker . "%(ENV_BUILDMASTER)s:%(ENV_BUILDMASTER_PORT)s" "%(ENV_WORKERNAME)s" "%(ENV_WORKERPASS)s" && buildbot-worker start --nodaemon'
autostart=true
autorestart=false
[program:redis]
command=/usr/bin/redis-server
autostart=true
autorestart=false

View File

@ -0,0 +1,3 @@
build-essential
redis-server
nodejs

View File

@ -1,6 +1,5 @@
const http = require('http'); const http = require('http');
// eslint-disable-next-line import/no-extraneous-dependencies const aws4 = require('aws4'); // eslint-disable-line import/no-unresolved
const aws4 = require('aws4');
// Input AWS access key, secret key, and session token. // Input AWS access key, secret key, and session token.
const accessKeyId = 'EO4FRH6BA2L7FCK0EKVT'; const accessKeyId = 'EO4FRH6BA2L7FCK0EKVT';

View File

@ -1,20 +0,0 @@
FROM ghcr.io/scality/federation/nodesvc-base:7.10.5.0
ENV UTAPI_CONFIG_FILE=${CONF_DIR}/config.json
WORKDIR ${HOME_DIR}/utapi
COPY ./package.json ./yarn.lock ${HOME_DIR}/utapi
# Remove when gitcache is sorted out
RUN rm /root/.gitconfig
RUN yarn install --production --frozen-lockfile --network-concurrency 1
COPY . ${HOME_DIR}/utapi
RUN chown -R ${USER} ${HOME_DIR}/utapi
USER ${USER}
CMD bash -c "source ${CONF_DIR}/env && export && supervisord -c ${CONF_DIR}/${SUPERVISORD_CONF}"

View File

@ -1,17 +0,0 @@
FROM redis:alpine
ENV S6_VERSION 2.0.0.1
ENV EXPORTER_VERSION 1.24.0
ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
&& tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \
&& rm -rf /tmp/s6-overlay-amd64.tar.gz
RUN wget https://github.com/oliver006/redis_exporter/releases/download/v${EXPORTER_VERSION}/redis_exporter-v${EXPORTER_VERSION}.linux-amd64.tar.gz -O redis_exporter.tar.gz \
&& tar xzf redis_exporter.tar.gz -C / \
&& cd .. \
&& mv /redis_exporter-v${EXPORTER_VERSION}.linux-amd64/redis_exporter /usr/local/bin/redis_exporter
ADD ./images/redis/s6 /etc
CMD /init

View File

@ -1,4 +0,0 @@
#!/usr/bin/with-contenv sh
echo "starting redis exporter"
exec redis_exporter

View File

@ -1,4 +0,0 @@
#!/usr/bin/with-contenv sh
echo "starting redis"
exec redis-server

View File

@ -1,2 +0,0 @@
standalone.host = 0.0.0.0
standalone.port = 4802

View File

@ -1,56 +0,0 @@
FROM golang:1.14-alpine as builder
ENV WARP10_EXPORTER_VERSION 2.7.5
RUN apk add zip unzip build-base \
&& wget -q -O exporter.zip https://github.com/centreon/warp10-sensision-exporter/archive/refs/heads/master.zip \
&& unzip exporter.zip \
&& cd warp10-sensision-exporter-master \
&& go mod download \
&& cd tools \
&& go run generate_sensision_metrics.go ${WARP10_EXPORTER_VERSION} \
&& cp sensision.go ../collector/ \
&& cd .. \
&& go build -a -o /usr/local/go/warp10_sensision_exporter
FROM ghcr.io/scality/utapi/warp10:2.8.1-95-g73e7de80
# Override baked in version
# Remove when updating to a numbered release
ENV WARP10_VERSION 2.8.1-95-g73e7de80
ENV S6_VERSION 2.0.0.1
ENV S6_BEHAVIOUR_IF_STAGE2_FAILS 2
ENV WARP10_CONF_TEMPLATES ${WARP10_HOME}/conf.templates/standalone
ENV SENSISION_DATA_DIR /data/sensision
ENV SENSISION_PORT 8082
# Modify Warp 10 default config
ENV standalone.home /opt/warp10
ENV warpscript.repository.directory /usr/local/share/warpscript
ENV warp.token.file /static.tokens
ENV warpscript.extension.protobuf io.warp10.ext.protobuf.ProtobufWarpScriptExtension
ENV warpscript.extension.macrovalueencoder 'io.warp10.continuum.ingress.MacroValueEncoder$Extension'
# ENV warpscript.extension.debug io.warp10.script.ext.debug.DebugWarpScriptExtension
RUN wget https://github.com/just-containers/s6-overlay/releases/download/v${S6_VERSION}/s6-overlay-amd64.tar.gz -O /tmp/s6-overlay-amd64.tar.gz \
&& tar xzf /tmp/s6-overlay-amd64.tar.gz -C / \
&& rm -rf /tmp/s6-overlay-amd64.tar.gz
# Install jmx exporter
ADD https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar /opt/jmx_prom_agent.jar
ADD ./images/warp10/jmx_prom.yaml /opt/jmx_prom.yaml
# Install protobuf extestion
ADD ./images/warp10/warp10-ext-protobuf-1.2.2-uberjar.jar /opt/warp10/lib/
# Install Sensision exporter
COPY --from=builder /usr/local/go/warp10_sensision_exporter /usr/local/bin/warp10_sensision_exporter
ADD ./images/warp10/s6 /etc
ADD ./warpscript /usr/local/share/warpscript
ADD ./images/warp10/static.tokens /
ADD ./images/warp10/90-default-host-port.conf $WARP10_CONF_TEMPLATES/90-default-host-port.conf
CMD /init

View File

@ -1,2 +0,0 @@
---
startDelaySeconds: 0

View File

@ -1,22 +0,0 @@
#!/usr/bin/with-contenv bash
set -eu
ensureDir() {
if [ ! -d "$1" ]; then
mkdir -p "$1"
echo "Created directory $1"
fi
}
ensureDir "$WARP10_DATA_DIR"
ensureDir "$WARP10_DATA_DIR/logs"
ensureDir "$WARP10_DATA_DIR/conf"
ensureDir "$WARP10_DATA_DIR/data/leveldb"
ensureDir "$WARP10_DATA_DIR/data/datalog"
ensureDir "$WARP10_DATA_DIR/data/datalog_done"
ensureDir "$SENSISION_DATA_DIR"
ensureDir "$SENSISION_DATA_DIR/logs"
ensureDir "$SENSISION_DATA_DIR/conf"
ensureDir "/var/run/sensision"

View File

@ -1,14 +0,0 @@
#!/usr/bin/with-contenv bash
set -eu
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
WARP10_SECRETS="$WARP10_CONFIG_DIR/00-secrets.conf"
if [ ! -f "$WARP10_SECRETS" ]; then
cp "$WARP10_CONF_TEMPLATES/00-secrets.conf.template" "$WARP10_SECRETS"
/usr/bin/java -cp ${WARP10_JAR} -Dfile.encoding=UTF-8 io.warp10.GenerateCryptoKey ${WARP10_SECRETS}
echo "warp10.manager.secret = scality" >> $WARP10_SECRETS
fi

View File

@ -1,14 +0,0 @@
#!/usr/bin/with-contenv sh
echo "Installing warp 10 config"
for path in $WARP10_CONF_TEMPLATES/*; do
name="$(basename $path .template)"
if [ ! -f "$WARP10_DATA_DIR/conf/$name" ]; then
cp "$path" "$WARP10_DATA_DIR/conf/$name"
echo "Copied $name to $WARP10_DATA_DIR/conf/$name"
fi
done
echo "Installing sensision config"
cp ${SENSISION_HOME}/templates/sensision.template ${SENSISION_DATA_DIR}/conf/sensision.conf
cp ${SENSISION_HOME}/templates/log4j.properties.template ${SENSISION_DATA_DIR}/conf/log4j.properties

View File

@ -1,23 +0,0 @@
#!/usr/bin/with-contenv sh
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
ensure_link() {
if [ ! -L "$1" ]; then
rm -rf "$1"
ln -s "$2" "$1"
echo "Created symlink $1->$2"
fi
}
ensure_link "$WARP10_HOME/logs" "$WARP10_DATA_DIR/logs"
ensure_link "$WARP10_HOME/etc/conf.d" "$WARP10_DATA_DIR/conf"
ensure_link "$WARP10_HOME/leveldb" "$WARP10_DATA_DIR/data/leveldb"
ensure_link "$WARP10_HOME/datalog" "$WARP10_DATA_DIR/data/datalog"
ensure_link "$WARP10_HOME/datalog_done" "$WARP10_DATA_DIR/data/datalog_done"
ensure_link "$SENSISION_HOME/etc" "${SENSISION_DATA_DIR}/conf"
ensure_link "$SENSISION_HOME/logs" "${SENSISION_DATA_DIR}/logs"
ensure_link /var/run/sensision/metrics ${SENSISION_HOME}/metrics
ensure_link /var/run/sensision/targets ${SENSISION_HOME}/targets
ensure_link /var/run/sensision/queued ${SENSISION_HOME}/queued

View File

@ -1,14 +0,0 @@
#!/usr/bin/with-contenv sh
JAVA="/usr/bin/java"
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*"
WARP10_INIT="io.warp10.standalone.WarpInit"
LEVELDB_HOME="/opt/warp10/leveldb"
# Create leveldb database
if [ "$(find -L ${LEVELDB_HOME} -maxdepth 1 -type f | wc -l)" -eq 0 ]; then
echo "Init leveldb database..." | tee -a "$WARP10_HOME/logs/warp10.log"
$JAVA -cp "$WARP10_CP" "$WARP10_INIT" "$LEVELDB_HOME" | tee -a "$WARP10_HOME/logs/warp10.log" 2>&1
fi

View File

@ -1,11 +0,0 @@
#!/usr/bin/with-contenv sh
WARPSTUDIO_CONFIG=${WARP10_CONFIG_DIR}/80-warpstudio-plugin.conf
if [ -n "$ENABLE_WARPSTUDIO" ]; then
cat > $WARPSTUDIO_CONFIG << EOF
warp10.plugin.warpstudio = io.warp10.plugins.warpstudio.WarpStudioPlugin
warpstudio.port = 8081
warpstudio.host = \${standalone.host}
EOF
fi

View File

@ -1,9 +0,0 @@
#!/usr/bin/with-contenv sh
chmod 1733 "$SENSISION_HOME/metrics"
chmod 1733 "$SENSISION_HOME/targets"
chmod 700 "$SENSISION_HOME/queued"
sed -i 's/@warp:WriteToken@/'"writeTokenStatic"'/' $SENSISION_DATA_DIR/conf/sensision.conf
sed -i -e "s_^sensision\.home.*_sensision\.home = ${SENSISION_HOME}_" $SENSISION_DATA_DIR/conf/sensision.conf
sed -i -e 's_^sensision\.qf\.url\.default.*_sensision\.qf\.url\.default=http://127.0.0.1:4802/api/v0/update_' $SENSISION_DATA_DIR/conf/sensision.conf

View File

@ -1,12 +0,0 @@
#!/usr/bin/with-contenv sh
EXPORTER_CMD="warp10_sensision_exporter --warp10.url=http://localhost:${SENSISION_PORT}/metrics"
if [ -f "/usr/local/bin/warp10_sensision_exporter" -a -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision exporter with $EXPORTER_CMD ..."
exec $EXPORTER_CMD
else
echo "Sensision is disabled. Not starting exporter."
# wait indefinitely
exec tail -f /dev/null
fi

View File

@ -1,25 +0,0 @@
#!/usr/bin/with-contenv sh
JAVA="/usr/bin/java"
JAVA_OPTS=""
SENSISION_CONFIG=${SENSISION_DATA_DIR}/conf/sensision.conf
SENSISION_JAR=${SENSISION_HOME}/bin/sensision-${SENSISION_VERSION}.jar
SENSISION_CP=${SENSISION_HOME}/etc:${SENSISION_JAR}
SENSISION_CLASS=io.warp10.sensision.Main
export MALLOC_ARENA_MAX=1
if [ -z "$SENSISION_HEAP" ]; then
SENSISION_HEAP=64m
fi
SENSISION_CMD="${JAVA} ${JAVA_OPTS} -Xmx${SENSISION_HEAP} -Dsensision.server.port=${SENSISION_PORT} ${SENSISION_OPTS} -Dsensision.config=${SENSISION_CONFIG} -cp ${SENSISION_CP} ${SENSISION_CLASS}"
if [ -n "$ENABLE_SENSISION" ]; then
echo "Starting Sensision with $SENSISION_CMD ..."
exec $SENSISION_CMD | tee -a ${SENSISION_HOME}/logs/sensision.log
else
echo "Sensision is disabled. Not starting."
# wait indefinitely
exec tail -f /dev/null
fi

View File

@ -1,43 +0,0 @@
#!/usr/bin/with-contenv sh
export SENSISIONID=warp10
export MALLOC_ARENA_MAX=1
JAVA="/usr/bin/java"
WARP10_JAR=${WARP10_HOME}/bin/warp10-${WARP10_VERSION}.jar
WARP10_CLASS=io.warp10.standalone.Warp
WARP10_CP="${WARP10_HOME}/etc:${WARP10_JAR}:${WARP10_HOME}/lib/*"
WARP10_CONFIG_DIR="$WARP10_DATA_DIR/conf"
CONFIG_FILES="$(find ${WARP10_CONFIG_DIR} -not -path '*/.*' -name '*.conf' | sort | tr '\n' ' ' 2> /dev/null)"
LOG4J_CONF=${WARP10_HOME}/etc/log4j.properties
if [ -z "$WARP10_HEAP" ]; then
WARP10_HEAP=1g
fi
if [ -z "$WARP10_HEAP_MAX" ]; then
WARP10_HEAP_MAX=4g
fi
JAVA_OPTS="-Dlog4j.configuration=file:${LOG4J_CONF} ${JAVA__EXTRA_OPTS} -Djava.awt.headless=true -Xms${WARP10_HEAP} -Xmx${WARP10_HEAP_MAX} -XX:+UseG1GC"
SENSISION_OPTS=
if [ -n "$ENABLE_SENSISION" ]; then
_SENSISION_LABELS=
# Expects a comma seperated list of key=value ex key=value,foo=bar
if [ -n "$SENSISION_LABELS" ]; then
_SENSISION_LABELS="-Dsensision.default.labels=$SENSISION_LABELS"
fi
SENSISION_OPTS="${_SENSISION_LABELS} -Dsensision.events.dir=/var/run/sensision/metrics -Dfile.encoding=UTF-8 ${SENSISION_EXTRA_OPTS}"
fi
JMX_EXPORTER_OPTS=
if [ -n "$ENABLE_JMX_EXPORTER" ]; then
JMX_EXPORTER_OPTS="-javaagent:/opt/jmx_prom_agent.jar=4803:/opt/jmx_prom.yaml ${JMX_EXPORTER_EXTRA_OPTS}"
echo "Starting jmx exporter with Warp 10."
fi
WARP10_CMD="${JAVA} ${JMX_EXPORTER_OPTS} ${JAVA_OPTS} ${SENSISION_OPTS} -cp ${WARP10_CP} ${WARP10_CLASS} ${CONFIG_FILES}"
echo "Starting Warp 10 with $WARP10_CMD ..."
exec $WARP10_CMD | tee -a ${WARP10_HOME}/logs/warp10.log

View File

@ -1,9 +0,0 @@
token.write.0.name=writeTokenStatic
token.write.0.producer=42424242-4242-4242-4242-424242424242
token.write.0.owner=42424242-4242-4242-4242-424242424242
token.write.0.app=utapi
token.read.0.name=readTokenStatic
token.read.0.owner=42424242-4242-4242-4242-424242424242
token.read.0.app=utapi

View File

@ -1,22 +1,7 @@
/* eslint-disable global-require */ 'use strict'; // eslint-disable-line strict
// eslint-disable-line strict
let toExport;
if (process.env.ENABLE_UTAPI_V2) { module.exports = {
toExport = {
utapiVersion: 2,
startUtapiServer: require('./libV2/server').startServer,
UtapiClient: require('./libV2/client'),
tasks: require('./libV2/tasks'),
};
} else {
toExport = {
utapiVersion: 1,
UtapiServer: require('./lib/server'), UtapiServer: require('./lib/server'),
UtapiClient: require('./lib/UtapiClient'), UtapiClient: require('./lib/UtapiClient.js'),
UtapiReplay: require('./lib/UtapiReplay'), UtapiReplay: require('./lib/UtapiReplay.js'),
UtapiReindex: require('./lib/UtapiReindex'), };
};
}
module.exports = toExport;

View File

@ -1,13 +1,34 @@
/* eslint-disable no-bitwise */
const assert = require('assert'); const assert = require('assert');
const fs = require('fs'); const fs = require('fs');
const path = require('path');
/** /**
* Reads from a config file and returns the content as a config object * Reads from a config file and returns the content as a config object
*/ */
class Config { class Config {
constructor(config) { constructor() {
this.component = config.component; /*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
this._basePath = path.resolve(__dirname, '..');
this.path = `${this._basePath}/config.json`;
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
this.path = process.env.UTAPI_CONFIG_FILE;
}
// Read config automatically
this._getConfig();
}
_getConfig() {
let config;
try {
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
config = JSON.parse(data);
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
this.port = 9500; this.port = 9500;
if (config.port !== undefined) { if (config.port !== undefined) {
@ -40,17 +61,16 @@ class Config {
this.healthChecks = { allowFrom: ['127.0.0.1/8', '::1'] }; this.healthChecks = { allowFrom: ['127.0.0.1/8', '::1'] };
if (config.healthChecks && config.healthChecks.allowFrom) { if (config.healthChecks && config.healthChecks.allowFrom) {
assert(Array.isArray(config.healthChecks.allowFrom), assert(Array.isArray(config.healthChecks.allowFrom),
'config: invalid healthcheck configuration. allowFrom must ' 'config: invalid healthcheck configuration. allowFrom must ' +
+ 'be an array'); 'be an array');
config.healthChecks.allowFrom.forEach(item => { config.healthChecks.allowFrom.forEach(item => {
assert(typeof item === 'string', assert(typeof item === 'string',
'config: invalid healthcheck configuration. allowFrom IP ' 'config: invalid healthcheck configuration. allowFrom IP ' +
+ 'address must be a string'); 'address must be a string');
}); });
// augment to the defaults // augment to the defaults
this.healthChecks.allowFrom = this.healthChecks.allowFrom.concat( this.healthChecks.allowFrom = this.healthChecks.allowFrom.concat(
config.healthChecks.allowFrom, config.healthChecks.allowFrom);
);
} }
// default to standalone configuration // default to standalone configuration
this.redis = { host: '127.0.0.1', port: 6379 }; this.redis = { host: '127.0.0.1', port: 6379 };
@ -72,11 +92,6 @@ class Config {
'bad config: sentinel port must be a number'); 'bad config: sentinel port must be a number');
this.redis.sentinels.push({ host, port }); this.redis.sentinels.push({ host, port });
}); });
if (config.redis.sentinelPassword !== undefined) {
assert(typeof config.redis.sentinelPassword === 'string',
'bad config: redis.sentinelPassword must be a string');
this.redis.sentinelPassword = config.redis.sentinelPassword;
}
} else { } else {
// check for standalone configuration // check for standalone configuration
assert(typeof config.redis.host === 'string', assert(typeof config.redis.host === 'string',
@ -93,13 +108,6 @@ class Config {
} }
} }
if (config.vaultclient) {
// Instance passed from outside
this.vaultclient = config.vaultclient;
this.vaultd = null;
} else {
// Connection data
this.vaultclient = null;
this.vaultd = {}; this.vaultd = {};
if (config.vaultd) { if (config.vaultd) {
if (config.vaultd.port !== undefined) { if (config.vaultd.port !== undefined) {
@ -114,30 +122,34 @@ class Config {
this.vaultd.host = config.vaultd.host; this.vaultd.host = config.vaultd.host;
} }
} }
}
if (config.certFilePaths) { if (config.certFilePaths) {
assert(typeof config.certFilePaths === 'object' assert(typeof config.certFilePaths === 'object' &&
&& typeof config.certFilePaths.key === 'string' typeof config.certFilePaths.key === 'string' &&
&& typeof config.certFilePaths.cert === 'string' && (( typeof config.certFilePaths.cert === 'string' && ((
config.certFilePaths.ca config.certFilePaths.ca &&
&& typeof config.certFilePaths.ca === 'string') typeof config.certFilePaths.ca === 'string') ||
|| !config.certFilePaths.ca)); !config.certFilePaths.ca)
);
} }
const { key, cert, ca } = config.certFilePaths const { key, cert, ca } = config.certFilePaths ?
? config.certFilePaths : {}; config.certFilePaths : {};
if (key && cert) { if (key && cert) {
const keypath = key; const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
const certpath = cert; const certpath = (cert[0] === '/') ?
let capath; cert : `${this._basePath}/${cert}`;
let capath = undefined;
if (ca) { if (ca) {
capath = ca; capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK), assert.doesNotThrow(() =>
fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`); `File not found or unreachable: ${capath}`);
} }
assert.doesNotThrow(() => fs.accessSync(keypath, fs.F_OK | fs.R_OK), assert.doesNotThrow(() =>
fs.accessSync(keypath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${keypath}`); `File not found or unreachable: ${keypath}`);
assert.doesNotThrow(() => fs.accessSync(certpath, fs.F_OK | fs.R_OK), assert.doesNotThrow(() =>
fs.accessSync(certpath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${certpath}`); `File not found or unreachable: ${certpath}`);
this.https = { this.https = {
cert: fs.readFileSync(certpath, 'ascii'), cert: fs.readFileSync(certpath, 'ascii'),
@ -149,21 +161,16 @@ class Config {
cert: certpath, cert: certpath,
}; };
} else if (key || cert) { } else if (key || cert) {
throw new Error('bad config: both certFilePaths.key and ' throw new Error('bad config: both certFilePaths.key and ' +
+ 'certFilePaths.cert must be defined'); 'certFilePaths.cert must be defined');
} }
if (config.expireMetrics !== undefined) { if (config.expireMetrics !== undefined) {
assert(typeof config.expireMetrics === 'boolean', 'bad config: ' assert(typeof config.expireMetrics === 'boolean', 'bad config: ' +
+ 'expireMetrics must be a boolean'); 'expireMetrics must be a boolean');
this.expireMetrics = config.expireMetrics; this.expireMetrics = config.expireMetrics;
} }
return config;
if (config.onlyCountLatestWhenObjectLocked !== undefined) {
assert(typeof config.onlyCountLatestWhenObjectLocked === 'boolean',
'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
}
} }
} }
module.exports = Config; module.exports = new Config();

View File

@ -33,14 +33,11 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
set(key, value, cb) { set(key, value, cb) {
return this._client.call( return this._client.set(key, value, cb);
(backend, done) => backend.set(key, value, done),
cb,
);
} }
/** /**
* Set a lock key, if it does not already exist, with an expiration * Set the replay lock key (if it does not already exist) with an expiration
* @param {string} key - key to set with an expiration * @param {string} key - key to set with an expiration
* @param {string} value - value containing the data * @param {string} value - value containing the data
* @param {string} ttl - time after which the key expires * @param {string} ttl - time after which the key expires
@ -48,7 +45,7 @@ class Datastore {
*/ */
setExpire(key, value, ttl) { setExpire(key, value, ttl) {
// This method is a Promise because no callback is given. // This method is a Promise because no callback is given.
return this._client.call(backend => backend.set(key, value, 'EX', ttl, 'NX')); return this._client.set(key, value, 'EX', ttl, 'NX');
} }
/** /**
@ -57,8 +54,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
del(key) { del(key) {
// This method is a Promise because no callback is given. return this._client.del(key);
return this._client.call(backend => backend.del(key));
} }
/** /**
@ -68,7 +64,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
get(key, cb) { get(key, cb) {
return this._client.call((backend, done) => backend.get(key, done), cb); return this._client.get(key, cb);
} }
/** /**
@ -78,7 +74,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
incr(key, cb) { incr(key, cb) {
return this._client.call((backend, done) => backend.incr(key, done), cb); return this._client.incr(key, cb);
} }
/** /**
@ -89,7 +85,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
incrby(key, value, cb) { incrby(key, value, cb) {
return this._client.call((backend, done) => backend.incrby(key, value, done), cb); return this._client.incrby(key, value, cb);
} }
/** /**
@ -99,7 +95,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
decr(key, cb) { decr(key, cb) {
return this._client.call((backend, done) => backend.decr(key, done), cb); return this._client.decr(key, cb);
} }
/** /**
@ -110,7 +106,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
decrby(key, value, cb) { decrby(key, value, cb) {
return this._client.call((backend, done) => backend.decrby(key, value, done), cb); return this._client.decrby(key, value, cb);
} }
/** /**
@ -122,7 +118,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zadd(key, score, value, cb) { zadd(key, score, value, cb) {
return this._client.call((backend, done) => backend.zadd(key, score, value, done), cb); return this._client.zadd(key, score, value, cb);
} }
/** /**
@ -135,7 +131,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zrange(key, min, max, cb) { zrange(key, min, max, cb) {
return this._client.call((backend, done) => backend.zrange(key, min, max, done), cb); return this._client.zrange(key, min, max, cb);
} }
/** /**
@ -148,7 +144,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zrangebyscore(key, min, max, cb) { zrangebyscore(key, min, max, cb) {
return this._client.call((backend, done) => backend.zrangebyscore(key, min, max, done), cb); return this._client.zrangebyscore(key, min, max, cb);
} }
/** /**
@ -161,12 +157,8 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
bZrangebyscore(keys, min, max, cb) { bZrangebyscore(keys, min, max, cb) {
return this._client.call( return this._client.pipeline(keys.map(
(backend, done) => backend item => ['zrangebyscore', item, min, max])).exec(cb);
.pipeline(keys.map(item => ['zrangebyscore', item, min, max]))
.exec(done),
cb,
);
} }
/** /**
@ -176,9 +168,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
batch(cmds, cb) { batch(cmds, cb) {
return this._client.call((backend, done) => { return this._client.multi(cmds).exec(cb);
backend.multi(cmds).exec(done);
}, cb);
} }
/** /**
@ -188,7 +178,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
pipeline(cmds, cb) { pipeline(cmds, cb) {
return this._client.call((backend, done) => backend.pipeline(cmds).exec(done), cb); return this._client.pipeline(cmds).exec(cb);
} }
/** /**
@ -198,10 +188,9 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
multi(cmds, cb) { multi(cmds, cb) {
return this._client.call((backend, done) => return this._client.multi(cmds).exec((err, res) => {
backend.multi(cmds).exec((err, res) => {
if (err) { if (err) {
return done(err); return cb(err);
} }
const flattenRes = []; const flattenRes = [];
const resErr = res.filter(item => { const resErr = res.filter(item => {
@ -209,10 +198,10 @@ class Datastore {
return item[0] !== null; return item[0] !== null;
}); });
if (resErr && resErr.length > 0) { if (resErr && resErr.length > 0) {
return done(resErr); return cb(resErr);
} }
return done(null, flattenRes); return cb(null, flattenRes);
}), cb); });
} }
/** /**
@ -225,7 +214,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
zremrangebyscore(key, min, max, cb) { zremrangebyscore(key, min, max, cb) {
return this._client.call((backend, done) => backend.zremrangebyscore(key, min, max, done), cb); return this._client.zremrangebyscore(key, min, max, cb);
} }
/** /**
@ -236,7 +225,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
lpush(key, val, cb) { lpush(key, val, cb) {
return this._client.call((backend, done) => backend.lpush(key, val, done), cb); return this._client.lpush(key, val, cb);
} }
/** /**
@ -246,7 +235,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
rpop(key, cb) { rpop(key, cb) {
return this._client.call((backend, done) => backend.rpop(key, done), cb); return this._client.rpop(key, cb);
} }
/** /**
@ -258,7 +247,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
lrange(key, start, stop, cb) { lrange(key, start, stop, cb) {
return this._client.call((backend, done) => backend.lrange(key, start, stop, done), cb); return this._client.lrange(key, start, stop, cb);
} }
/** /**
@ -268,7 +257,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
llen(key, cb) { llen(key, cb) {
return this._client.call((backend, done) => backend.llen(key, done), cb); return this._client.llen(key, cb);
} }
/** /**
@ -279,7 +268,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
publish(channel, message, cb) { publish(channel, message, cb) {
return this._client.call((backend, done) => backend.publish(channel, message, done), cb); return this._client.publish(channel, message, cb);
} }
/** /**
@ -290,7 +279,7 @@ class Datastore {
* @return {undefined} * @return {undefined}
*/ */
scan(cursor, pattern, cb) { scan(cursor, pattern, cb) {
return this._client.call((backend, done) => backend.scan(cursor, 'match', pattern, done), cb); return this._client.scan(cursor, 'match', pattern, cb);
} }
} }

View File

@ -1,18 +1,15 @@
/* eslint-disable prefer-spread */
/* eslint-disable prefer-destructuring */
/* eslint-disable class-methods-use-this */
/* eslint-disable no-mixed-operators */
const async = require('async'); const async = require('async');
const { errors } = require('arsenal'); const { errors } = require('arsenal');
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema'); const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
const s3metricResponseJSON = require('../models/s3metricResponse'); const s3metricResponseJSON = require('../models/s3metricResponse');
const config = require('./Config');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month. const Vault = require('./Vault');
/** /**
* Provides methods to get metrics of different levels * Provides methods to get metrics of different levels
*/ */
class ListMetrics { class ListMetrics {
/** /**
* Assign the metric property to an instance of this class * Assign the metric property to an instance of this class
* @param {string} metric - The metric type (e.g., 'buckets', 'accounts') * @param {string} metric - The metric type (e.g., 'buckets', 'accounts')
@ -21,6 +18,7 @@ class ListMetrics {
constructor(metric, component) { constructor(metric, component) {
this.metric = metric; this.metric = metric;
this.service = component; this.service = component;
this.vault = new Vault(config);
} }
/** /**
@ -80,10 +78,9 @@ class ListMetrics {
const resources = validator.get(this.metric); const resources = validator.get(this.metric);
const timeRange = validator.get('timeRange'); const timeRange = validator.get('timeRange');
const datastore = utapiRequest.getDatastore(); const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids // map account ids to canonical ids
if (this.metric === 'accounts') { if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => { return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) { if (err) {
return cb(err); return cb(err);
} }
@ -96,11 +93,14 @@ class ListMetrics {
return next(null, Object.assign({}, res, return next(null, Object.assign({}, res,
{ accountId: item.accountId })); { accountId: item.accountId }));
}), }),
cb); cb
);
}); });
} }
return async.mapLimit(resources, 5, (resource, next) => this.getMetrics(resource, timeRange, datastore, log, return async.mapLimit(resources, 5, (resource, next) =>
next), cb); this.getMetrics(resource, timeRange, datastore, log,
next), cb
);
} }
/** /**
@ -122,29 +122,10 @@ class ListMetrics {
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
const timeRange = [start - fifteenMinutes, end]; const timeRange = [start - fifteenMinutes, end];
const datastore = utapiRequest.getDatastore(); const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault(); async.mapLimit(resources, 5, (resource, next) =>
this.getMetrics(resource, timeRange, datastore, log,
// map account ids to canonical ids next), cb
if (this.metric === 'accounts') { );
return vault.getCanonicalIds(resources, log, (err, list) => {
if (err) {
return cb(err);
}
return async.mapLimit(list.message.body, 5,
(item, next) => this.getMetrics(item.canonicalId, timeRange,
datastore, log, (err, res) => {
if (err) {
return next(err);
}
return next(null, Object.assign({}, res,
{ accountId: item.accountId }));
}),
cb);
});
}
return async.mapLimit(resources, 5, (resource, next) => this.getMetrics(resource, timeRange, datastore, log,
next), cb);
} }
/** /**
@ -161,60 +142,11 @@ class ListMetrics {
res.push(last); res.push(last);
const d = new Date(last); const d = new Date(last);
last = d.setMinutes(d.getMinutes() + 15); last = d.setMinutes(d.getMinutes() + 15);
if (process.env.UTAPI_INTERVAL_TEST_MODE === 'true') {
last = d.setSeconds(d.getSeconds() + 15);
}
} }
res.push(end); res.push(end);
return res; return res;
} }
_buildSubRanges(range) {
let start = range[0];
const end = range[1] || Date.now();
const subRangesCount = Math.floor((end - start) / MAX_RANGE_MS) + 1;
const subRanges = [];
// eslint-disable-next-line no-plusplus
for (let i = 0; i < subRangesCount; i++) {
if (i + 1 === subRangesCount) {
subRanges.push([start, end]);
} else {
subRanges.push([start, (start + (MAX_RANGE_MS - 1))]);
start += MAX_RANGE_MS;
}
}
return subRanges;
}
_reduceResults(results) {
const reducer = (accumulator, current) => {
const result = Object.assign({}, accumulator);
result.timeRange[1] = current.timeRange[1];
result.storageUtilized[1] = current.storageUtilized[1];
result.numberOfObjects[1] = current.numberOfObjects[1];
result.incomingBytes += current.incomingBytes;
result.outgoingBytes += current.outgoingBytes;
const operations = Object.keys(result.operations);
operations.forEach(operation => {
result.operations[operation] += current.operations[operation];
});
return result;
};
return results.reduce(reducer);
}
getMetrics(resource, range, datastore, log, cb) {
const ranges = this._buildSubRanges(range);
async.mapLimit(ranges, 5, (subRange, next) => this._getMetricsRange(resource, subRange, datastore, log, next),
(err, results) => {
if (err) {
return cb(err);
}
const response = this._reduceResults(results);
return cb(null, response);
});
}
/** /**
* Callback for getting metrics for a single resource * Callback for getting metrics for a single resource
* @callback ListMetrics~getMetricsCb * @callback ListMetrics~getMetricsCb
@ -246,9 +178,9 @@ class ListMetrics {
* @param {ListMetrics~getMetricsCb} cb - callback * @param {ListMetrics~getMetricsCb} cb - callback
* @return {undefined} * @return {undefined}
*/ */
_getMetricsRange(resource, range, datastore, log, cb) { getMetrics(resource, range, datastore, log, cb) {
const start = range[0]; const start = range[0];
const end = range[1]; const end = range[1] || Date.now();
const obj = this._getSchemaObject(resource); const obj = this._getSchemaObject(resource);
// find nearest neighbors for absolutes // find nearest neighbors for absolutes
@ -264,8 +196,7 @@ class ListMetrics {
'-inf', 'LIMIT', '0', '1']; '-inf', 'LIMIT', '0', '1'];
const timestampRange = this._getTimestampRange(start, end); const timestampRange = this._getTimestampRange(start, end);
const metricKeys = [].concat.apply([], timestampRange.map( const metricKeys = [].concat.apply([], timestampRange.map(
i => getKeys(obj, i), i => getKeys(obj, i)));
));
const cmds = metricKeys.map(item => ['get', item]); const cmds = metricKeys.map(item => ['get', item]);
cmds.push(storageUtilizedStart, storageUtilizedEnd, cmds.push(storageUtilizedStart, storageUtilizedEnd,
numberOfObjectsStart, numberOfObjectsEnd); numberOfObjectsStart, numberOfObjectsEnd);
@ -312,10 +243,10 @@ class ListMetrics {
}); });
if (!areMetricsPositive) { if (!areMetricsPositive) {
log.info('negative metric value found', { return cb(errors.InternalError.customizeDescription(
error: resource, 'Utapi is in a transient state for this time period as ' +
method: 'ListMetrics.getMetrics', 'metrics are being collected. Please try again in a few ' +
}); 'minutes.'));
} }
/** /**
* Batch result is of the format * Batch result is of the format
@ -340,8 +271,8 @@ class ListMetrics {
if (m === 'incomingBytes' || m === 'outgoingBytes') { if (m === 'incomingBytes' || m === 'outgoingBytes') {
metricResponse[m] += count; metricResponse[m] += count;
} else { } else {
metricResponse.operations[`${this.service}:${m}`] metricResponse.operations[`${this.service}:${m}`] +=
+= count; count;
} }
} }
}); });

View File

@ -1,14 +1,10 @@
/* eslint-disable prefer-destructuring */
/* eslint-disable class-methods-use-this */
/* eslint-disable no-mixed-operators */
const assert = require('assert'); const assert = require('assert');
const { doUntil, parallel } = require('async'); const { doUntil, parallel } = require('async');
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const { errors } = require('arsenal');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const { generateKey, generateCounter, generateStateKey } = require('./schema'); const { generateKey, generateCounter, generateStateKey } = require('./schema');
const redisClientv2 = require('../utils/redisClientv2'); const { errors } = require('arsenal');
const member = require('../utils/member'); const redisClient = require('../utils/redisClient');
const methods = { const methods = {
createBucket: { method: '_genericPushMetric', changesData: true }, createBucket: { method: '_genericPushMetric', changesData: true },
@ -23,76 +19,41 @@ const methods = {
getBucketWebsite: { method: '_genericPushMetric', changesData: false }, getBucketWebsite: { method: '_genericPushMetric', changesData: false },
getBucketLocation: { method: '_genericPushMetric', changesData: false }, getBucketLocation: { method: '_genericPushMetric', changesData: false },
deleteBucketWebsite: { method: '_genericPushMetric', changesData: true }, deleteBucketWebsite: { method: '_genericPushMetric', changesData: true },
uploadPart: { method: '_genericPushMetricUploadPart', changesData: true }, uploadPart: { method: '_pushMetricUploadPart', changesData: true },
uploadPartCopy: { initiateMultipartUpload: { method: '_genericPushMetric',
method: '_genericPushMetricUploadPart', changesData: true },
changesData: true, completeMultipartUpload: { method: '_pushMetricCompleteMultipartUpload',
}, changesData: true },
initiateMultipartUpload: { listMultipartUploads: { method: '_pushMetricListBucketMultipartUploads',
method: '_genericPushMetric', changesData: false },
changesData: true, listMultipartUploadParts: { method: '_genericPushMetric',
}, changesData: false },
completeMultipartUpload: { abortMultipartUpload: { method: '_genericPushMetricDeleteObject',
method: '_pushMetricCompleteMultipartUpload', changesData: true },
changesData: true, deleteObject: { method: '_genericPushMetricDeleteObject',
}, changesData: true },
listMultipartUploads: { multiObjectDelete: { method: '_genericPushMetricDeleteObject',
method: '_pushMetricListBucketMultipartUploads', changesData: true },
changesData: false,
},
listMultipartUploadParts: {
method: '_genericPushMetric',
changesData: false,
},
abortMultipartUpload: {
method: '_genericPushMetricDeleteObject',
changesData: true,
},
deleteObject: {
method: '_genericPushMetricDeleteObject',
changesData: true,
},
multiObjectDelete: {
method: '_genericPushMetricDeleteObject',
changesData: true,
},
getObject: { method: '_pushMetricGetObject', changesData: false }, getObject: { method: '_pushMetricGetObject', changesData: false },
getObjectAcl: { method: '_genericPushMetric', changesData: false }, getObjectAcl: { method: '_genericPushMetric', changesData: false },
getObjectLegalHold: { method: '_genericPushMetric', changesData: false },
getObjectRetention: { method: '_genericPushMetric', changesData: false },
getObjectTagging: { method: '_genericPushMetric', changesData: false }, getObjectTagging: { method: '_genericPushMetric', changesData: false },
putObject: { method: '_genericPushMetricPutObject', changesData: true }, putObject: { method: '_genericPushMetricPutObject', changesData: true },
copyObject: { method: '_genericPushMetricPutObject', changesData: true }, copyObject: { method: '_genericPushMetricPutObject', changesData: true },
putObjectAcl: { method: '_genericPushMetric', changesData: true }, putObjectAcl: { method: '_genericPushMetric', changesData: true },
putObjectLegalHold: { method: '_genericPushMetric', changesData: true },
putObjectRetention: { method: '_genericPushMetric', changesData: true },
putObjectTagging: { method: '_genericPushMetric', changesData: true }, putObjectTagging: { method: '_genericPushMetric', changesData: true },
deleteObjectTagging: { method: '_genericPushMetric', changesData: true }, deleteObjectTagging: { method: '_genericPushMetric', changesData: true },
headBucket: { method: '_genericPushMetric', changesData: false }, headBucket: { method: '_genericPushMetric', changesData: false },
headObject: { method: '_genericPushMetric', changesData: false }, headObject: { method: '_genericPushMetric', changesData: false },
putBucketVersioning: { method: '_genericPushMetric', changesData: true }, putBucketVersioning: { method: '_genericPushMetric', changesData: true },
getBucketVersioning: { method: '_genericPushMetric', changesData: false }, getBucketVersioning: { method: '_genericPushMetric', changesData: false },
putDeleteMarkerObject: { putDeleteMarkerObject: { method: '_pushMetricDeleteMarkerObject',
method: '_pushMetricDeleteMarkerObject', changesData: true },
changesData: true, putBucketReplication: { method: '_genericPushMetric',
}, changesData: true },
putBucketReplication: { getBucketReplication: { method: '_genericPushMetric',
method: '_genericPushMetric', changesData: false },
changesData: true, deleteBucketReplication: { method: '_genericPushMetric',
}, changesData: true },
getBucketReplication: {
method: '_genericPushMetric',
changesData: false,
},
deleteBucketReplication: {
method: '_genericPushMetric',
changesData: true,
},
putBucketObjectLock: { method: '_genericPushMetric', changesData: true },
getBucketObjectLock: { method: '_genericPushMetric', changesData: true },
replicateObject: { method: '_genericPushMetricPutObject', changesData: true },
replicateTags: { method: '_genericPushMetric', changesData: true },
replicateDelete: { method: '_pushMetricDeleteMarkerObject', changesData: true },
}; };
const metricObj = { const metricObj = {
@ -125,42 +86,39 @@ class UtapiClient {
// By default, we push all resource types // By default, we push all resource types
this.metrics = ['buckets', 'accounts', 'users', 'service', 'location']; this.metrics = ['buckets', 'accounts', 'users', 'service', 'location'];
this.service = 's3'; this.service = 's3';
this.disableOperationCounters = false;
this.enabledOperationCounters = [];
this.disableClient = true; this.disableClient = true;
if (config && !config.disableClient) { if (config) {
this.disableClient = false; this.disableClient = false;
this.expireMetrics = config.expireMetrics; this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0; this.expireTTL = config.expireTTL || 0;
if (config.metrics) { if (config.metrics) {
const message = 'invalid property in UtapiClient configuration'; const message = 'invalid property in UtapiClient configuration';
assert(Array.isArray(config.metrics), `${message}: metrics ` assert(Array.isArray(config.metrics), `${message}: metrics ` +
+ 'must be an array'); 'must be an array');
assert(config.metrics.length !== 0, `${message}: metrics ` assert(config.metrics.length !== 0, `${message}: metrics ` +
+ 'array cannot be empty'); 'array cannot be empty');
// if location is the only metric, pushMetric should be disabled
if (config.metrics.length === 1 &&
config.metrics[0] === 'location') {
this.disableClient = true;
}
this.metrics = config.metrics; this.metrics = config.metrics;
} }
if (config.redis) { if (config.redis) {
this.ds = new Datastore() this.ds = new Datastore()
.setClient(redisClientv2(config.redis, this.log)); .setClient(redisClient(config.redis, this.log));
} }
if (config.localCache) { if (config.localCache) {
this.localCache = new Datastore() this.localCache = new Datastore()
.setClient(redisClientv2(config.localCache, this.log)); .setClient(redisClient(config.localCache, this.log));
} }
if (config.component) { if (config.component) {
// The configuration uses the property `component`, while // The configuration uses the property `component`, while
// internally this is known as a metric level `service`. // internally this is known as a metric level `service`.
this.service = config.component; this.service = config.component;
} }
if (config.disableOperationCounters) {
this.disableOperationCounters = config.disableOperationCounters;
}
if (config.enabledOperationCounters) {
this.enabledOperationCounters = config.enabledOperationCounters;
}
} }
} }
@ -172,10 +130,6 @@ class UtapiClient {
static getNormalizedTimestamp() { static getNormalizedTimestamp() {
const d = new Date(); const d = new Date();
const minutes = d.getMinutes(); const minutes = d.getMinutes();
if (process.env.UTAPI_INTERVAL_TEST_MODE === 'true') {
const seconds = d.getSeconds();
return d.setSeconds((seconds - seconds % 15), 0, 0);
}
return d.setMinutes((minutes - minutes % 15), 0, 0); return d.setMinutes((minutes - minutes % 15), 0, 0);
} }
@ -190,13 +144,6 @@ class UtapiClient {
return this; return this;
} }
_isCounterEnabled(action) {
if (this.enabledOperationCounters.length > 0) {
return this.enabledOperationCounters.some(counter => counter.toLowerCase() === action.toLowerCase());
}
return this.disableOperationCounters === false;
}
/* /*
* Utility function to use when callback is not defined * Utility function to use when callback is not defined
*/ */
@ -214,21 +161,16 @@ class UtapiClient {
*/ */
_pushLocalCache(params, operation, timestamp, log, cb) { _pushLocalCache(params, operation, timestamp, log, cb) {
// 'listMultipartUploads' has a different name in the metric response. // 'listMultipartUploads' has a different name in the metric response.
const action = operation === 'listBucketMultipartUploads' const action = operation === 'listBucketMultipartUploads' ?
? 'listMultipartUploads' : operation; 'listMultipartUploads' : operation;
const logObject = { const logObject = { method: 'UtapiClient._pushLocalCache', action,
method: 'UtapiClient._pushLocalCache', params };
action,
params,
};
if (!this.localCache) { if (!this.localCache) {
log.fatal('failed to push metrics', logObject); log.fatal('failed to push metrics', logObject);
return cb(errors.InternalError); return cb(errors.InternalError);
} }
const reqUid = log.getSerializedUids(); const reqUid = log.getSerializedUids();
const value = JSON.stringify({ const value = JSON.stringify({ action, reqUid, params, timestamp });
action, reqUid, params, timestamp,
});
return this.localCache.lpush('s3:utapireplay', value, err => { return this.localCache.lpush('s3:utapireplay', value, err => {
if (err) { if (err) {
log.error('error inserting data in local cache', logObject); log.error('error inserting data in local cache', logObject);
@ -254,15 +196,15 @@ class UtapiClient {
*/ */
_checkProperties(params, properties = []) { _checkProperties(params, properties = []) {
properties.forEach(prop => { properties.forEach(prop => {
assert(params[prop] !== undefined, 'Metric object must include ' assert(params[prop] !== undefined, 'Metric object must include ' +
+ `${prop} property`); `${prop} property`);
if (prop === 'oldByteLength') { if (prop === 'oldByteLength') {
assert(typeof params[prop] === 'number' assert(typeof params[prop] === 'number' ||
|| params[prop] === null, 'oldByteLength property must be ' params[prop] === null, 'oldByteLength property must be ' +
+ 'an integer or `null`'); 'an integer or `null`');
} else { } else {
assert(typeof params[prop] === 'number', `${prop} property ` assert(typeof params[prop] === 'number', `${prop} property ` +
+ 'must be an integer'); 'must be an integer');
} }
}); });
} }
@ -281,8 +223,8 @@ class UtapiClient {
// Object of metric types and their associated property names // Object of metric types and their associated property names
this.metrics.forEach(level => { this.metrics.forEach(level => {
const propName = metricObj[level]; const propName = metricObj[level];
assert(typeof params[propName] === 'string' assert(typeof params[propName] === 'string' ||
|| params[propName] === undefined, params[propName] === undefined,
`${propName} must be a string`); `${propName} must be a string`);
}); });
} }
@ -319,9 +261,8 @@ class UtapiClient {
_getParamsArr(params) { _getParamsArr(params) {
this._checkMetricTypes(params); this._checkMetricTypes(params);
const props = []; const props = [];
const { const { byteLength, newByteLength, oldByteLength, numberOfObjects } =
byteLength, newByteLength, oldByteLength, numberOfObjects, params;
} = params;
// We add a `service` property to any non-service level to be able to // We add a `service` property to any non-service level to be able to
// build the appropriate schema key. // build the appropriate schema key.
this.metrics.forEach(level => { this.metrics.forEach(level => {
@ -417,8 +358,7 @@ class UtapiClient {
log, callback); log, callback);
} }
return callback(); return callback();
}, });
);
} }
log.debug(`UtapiClient::pushMetric: ${metric} unsupported`); log.debug(`UtapiClient::pushMetric: ${metric} unsupported`);
return callback(); return callback();
@ -436,9 +376,6 @@ class UtapiClient {
* @return {undefined} * @return {undefined}
*/ */
_genericPushMetric(params, timestamp, action, log, callback) { _genericPushMetric(params, timestamp, action, log, callback) {
if (!this._isCounterEnabled(action)) {
return process.nextTick(callback);
}
this._checkProperties(params); this._checkProperties(params);
this._logMetric(params, '_genericPushMetric', timestamp, log); this._logMetric(params, '_genericPushMetric', timestamp, log);
const cmds = this._getParamsArr(params) const cmds = this._getParamsArr(params)
@ -494,30 +431,30 @@ class UtapiClient {
return done(); return done();
}), }),
// if cursor is 0, it reached end of scan // if cursor is 0, it reached end of scan
cb => cb(null, cursor === '0'), () => cursor === '0',
err => callback(err, keys), err => callback(err, keys)
); );
} }
_expireMetrics(keys, log, callback) { _expireMetrics(keys, log, callback) {
// expire metrics here // expire metrics here
const expireCmds = keys.map(k => ['expire', k, this.expireMetricsTTL]); const expireCmds = keys.map(k => ['expire', k, this.expireTTL]);
return this.ds.multi(expireCmds, (err, result) => { return this.ds.multi(expireCmds, (err, result) => {
if (err) { if (err) {
const logParam = Array.isArray(err) ? { errorList: err } const logParam = Array.isArray(err) ? { errorList: err } :
: { error: err }; { error: err };
log.error('error expiring metrics', logParam); log.error('error expiring metrics', logParam);
return callback(err); return callback(err);
} }
// each delete command gets a score 1 if it's a success, // each delete command gets a score 1 if it's a success,
// should match the total commands sent for deletion // should match the total commands sent for deletion
const allKeysDeleted = keys.length === result.reduce((a, v) => a + v, 0); const allKeysDeleted =
keys.length === result.reduce((a, v) => a + v, 0);
if (!allKeysDeleted) { if (!allKeysDeleted) {
log.debug('error expiring keys', { delResult: result }); log.debug('error expiring keys', { delResult: result });
return callback( return callback(
errors.InternalError.customizeDescription( errors.InternalError.customizeDescription(
'error expiring some keys', 'error expiring some keys')
),
); );
} }
return callback(); return callback();
@ -542,15 +479,12 @@ class UtapiClient {
log); log);
const cmds = []; const cmds = [];
const paramsArr = this._getParamsArr(params); const paramsArr = this._getParamsArr(params);
paramsArr.forEach(p => { paramsArr.forEach(p => cmds.push(
cmds.push(['incr', generateCounter(p, 'numberOfObjectsCounter')]); ['incr', generateCounter(p, 'numberOfObjectsCounter')],
const counterAction = action === 'putDeleteMarkerObject' ? 'deleteObject' : action; ['incr', generateKey(p, 'deleteObject', timestamp)]));
if (this._isCounterEnabled(counterAction)) { // We track the number of commands needed for each `paramsArr` property
cmds.push(['incr', generateKey(p, counterAction, timestamp)]); // to eventually locate each group in the results from Redis.
} const commandsGroupSize = 2;
cmds.push(['zrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp]);
});
return this.ds.batch(cmds, (err, results) => { return this.ds.batch(cmds, (err, results) => {
if (err) { if (err) {
log.error('error pushing metric', { log.error('error pushing metric', {
@ -560,13 +494,9 @@ class UtapiClient {
return this._pushLocalCache(params, action, timestamp, log, cb); return this._pushLocalCache(params, action, timestamp, log, cb);
} }
const cmds2 = []; const cmds2 = [];
// We track the number of commands needed for each `paramsArr`
// property to eventually locate each group in the results from
// Redis.
const commandsGroupSize = (cmds.length / paramsArr.length);
const noErr = paramsArr.every((p, i) => { const noErr = paramsArr.every((p, i) => {
// We want the first element of every group of commands returned // We want the first element of every group of two commands
// from Redis. This contains the value of the // returned from Redis. This contains the value of the
// numberOfObjectsCounter after it has been incremented. // numberOfObjectsCounter after it has been incremented.
const index = i * commandsGroupSize; const index = i * commandsGroupSize;
const actionErr = results[index][0]; const actionErr = results[index][0];
@ -582,50 +512,14 @@ class UtapiClient {
let actionCounter = parseInt(results[index][1], 10); let actionCounter = parseInt(results[index][1], 10);
// If < 0 or NaN, record numberOfObjects as though bucket were // If < 0 or NaN, record numberOfObjects as though bucket were
// empty. // empty.
actionCounter = Number.isNaN(actionCounter) actionCounter = Number.isNaN(actionCounter) ||
|| actionCounter < 0 ? 1 : actionCounter; actionCounter < 0 ? 1 : actionCounter;
if (Number.isInteger(params.byteLength)) {
/* byteLength is passed in from cloudserver under the follow conditions:
* - bucket versioning is suspended
* - object version id is null
* - the content length of the object exists
* In this case, the master key is deleted and replaced with a delete marker.
* The decrement accounts for the deletion of the master key when utapi reports
* on the number of objects.
*/
actionCounter -= 1;
}
const key = generateStateKey(p, 'numberOfObjects'); const key = generateStateKey(p, 'numberOfObjects');
const byteArr = results[index + commandsGroupSize - 1][1];
const oldByteLength = byteArr ? parseInt(byteArr[0], 10) : 0;
const newByteLength = member.serialize(Math.max(0, oldByteLength - params.byteLength));
cmds2.push( cmds2.push(
['zremrangebyscore', key, timestamp, timestamp], ['zremrangebyscore', key, timestamp, timestamp],
['zadd', key, timestamp, member.serialize(actionCounter)], ['zadd', key, timestamp, actionCounter]);
);
if (Number.isInteger(params.byteLength)) {
cmds2.push(
['decr', generateCounter(p, 'numberOfObjectsCounter')],
['decrby', generateCounter(p, 'storageUtilizedCounter'), params.byteLength],
);
}
if (byteArr) {
cmds2.push(
['zremrangebyscore', generateStateKey(p, 'storageUtilized'), timestamp, timestamp],
['zadd', generateStateKey(p, 'storageUtilized'), timestamp, newByteLength],
);
}
return true; return true;
}); });
if (noErr) { if (noErr) {
return this.ds.batch(cmds2, cb); return this.ds.batch(cmds2, cb);
} }
@ -634,7 +528,7 @@ class UtapiClient {
} }
/** /**
* Updates counter for UploadPart or UploadPartCopy action * Updates counter for UploadPart action
* @param {object} params - params for the metrics * @param {object} params - params for the metrics
* @param {string} [params.bucket] - (optional) bucket name * @param {string} [params.bucket] - (optional) bucket name
* @param {string} [params.accountId] - (optional) account ID * @param {string} [params.accountId] - (optional) account ID
@ -645,9 +539,9 @@ class UtapiClient {
* @param {callback} callback - callback to call * @param {callback} callback - callback to call
* @return {undefined} * @return {undefined}
*/ */
_genericPushMetricUploadPart(params, timestamp, action, log, callback) { _pushMetricUploadPart(params, timestamp, action, log, callback) {
this._checkProperties(params, ['newByteLength', 'oldByteLength']); this._checkProperties(params, ['newByteLength', 'oldByteLength']);
this._logMetric(params, '_genericPushMetricUploadPart', timestamp, log); this._logMetric(params, '_pushMetricUploadPart', timestamp, log);
const cmds = []; const cmds = [];
const { newByteLength, oldByteLength } = params; const { newByteLength, oldByteLength } = params;
const oldObjSize = oldByteLength === null ? 0 : oldByteLength; const oldObjSize = oldByteLength === null ? 0 : oldByteLength;
@ -659,16 +553,14 @@ class UtapiClient {
storageUtilizedDelta], storageUtilizedDelta],
['incrby', generateKey(p, 'incomingBytes', timestamp), ['incrby', generateKey(p, 'incomingBytes', timestamp),
newByteLength], newByteLength],
['incr', generateKey(p, action, timestamp)]
); );
if (this._isCounterEnabled(action)) {
cmds.push(['incr', generateKey(p, action, timestamp)]);
}
}); });
// update counters // update counters
return this.ds.batch(cmds, (err, results) => { return this.ds.batch(cmds, (err, results) => {
if (err) { if (err) {
log.error('error pushing metric', { log.error('error pushing metric', {
method: 'UtapiClient._genericPushMetricUploadPart', method: 'UtapiClient._pushMetricUploadPart',
error: err, error: err,
}); });
return this._pushLocalCache(params, action, timestamp, log, return this._pushLocalCache(params, action, timestamp, log,
@ -689,11 +581,11 @@ class UtapiClient {
actionErr = results[index][0]; actionErr = results[index][0];
actionCounter = parseInt(results[index][1], 10); actionCounter = parseInt(results[index][1], 10);
// If < 0, record storageUtilized as though bucket were empty. // If < 0, record storageUtilized as though bucket were empty.
actionCounter = actionCounter < 0 ? storageUtilizedDelta actionCounter = actionCounter < 0 ? storageUtilizedDelta :
: actionCounter; actionCounter;
if (actionErr) { if (actionErr) {
log.error('error incrementing counter for push metric', { log.error('error incrementing counter for push metric', {
method: 'UtapiClient._genericPushMetricUploadPart', method: 'UtapiClient._pushMetricUploadPart',
metric: 'storage utilized', metric: 'storage utilized',
error: actionErr, error: actionErr,
}); });
@ -705,7 +597,7 @@ class UtapiClient {
['zremrangebyscore', generateStateKey(p, 'storageUtilized'), ['zremrangebyscore', generateStateKey(p, 'storageUtilized'),
timestamp, timestamp], timestamp, timestamp],
['zadd', generateStateKey(p, 'storageUtilized'), ['zadd', generateStateKey(p, 'storageUtilized'),
timestamp, member.serialize(actionCounter)], timestamp, actionCounter]
); );
return true; return true;
}); });
@ -716,57 +608,6 @@ class UtapiClient {
}); });
} }
_multipartUploadOverwrite(params, timestamp, action, log, cb) {
const counterCommands = [];
const levels = this._getParamsArr(params);
levels.forEach(level => {
const key = generateCounter(level, 'storageUtilizedCounter');
counterCommands.push(['decrby', key, params.oldByteLength]);
if (this._isCounterEnabled(action)) {
const key = generateKey(level, action, timestamp);
counterCommands.push(['incr', key]);
}
});
return this.ds.batch(counterCommands, (err, res) => {
if (err) {
log.error('error decrementing counter for push metric', {
method: 'UtapiClient._multipartUploadOverwrite',
error: err,
});
return this._pushLocalCache(params, action, timestamp, log, cb);
}
const commandErr = res.find(i => i[0]);
if (commandErr) {
log.error('error decrementing counter for push metric', {
method: 'UtapiClient._multipartUploadOverwrite',
error: commandErr,
});
return this._pushLocalCache(params, action, timestamp, log, cb);
}
const sortedSetCommands = [];
levels.forEach((level, i) => {
const key = generateStateKey(level, 'storageUtilized');
// We want the result of the storage utilized counter update
// that is the first of every group of levels.
const groupSize = counterCommands.length / levels.length;
const value = res[i * groupSize][1];
const storageUtilized = Number.parseInt(value, 10);
sortedSetCommands.push(
['zremrangebyscore', key, timestamp, timestamp],
['zadd', key, timestamp, member.serialize(storageUtilized)],
);
});
return this.ds.batch(sortedSetCommands, cb);
});
}
/** /**
* Updates counter for CompleteMultipartUpload action * Updates counter for CompleteMultipartUpload action
* @param {object} params - params for the metrics * @param {object} params - params for the metrics
@ -783,25 +624,17 @@ class UtapiClient {
this._checkProperties(params); this._checkProperties(params);
this._logMetric(params, '_pushMetricCompleteMultipartUpload', timestamp, this._logMetric(params, '_pushMetricCompleteMultipartUpload', timestamp,
log); log);
// Is the MPU completion overwriting an object?
if (params.oldByteLength !== null) {
return this._multipartUploadOverwrite(
params, timestamp, action, log, callback,
);
}
const paramsArr = this._getParamsArr(params); const paramsArr = this._getParamsArr(params);
const cmds = []; const cmds = [];
paramsArr.forEach(p => { paramsArr.forEach(p => {
cmds.push(['incr', generateCounter(p, 'numberOfObjectsCounter')]); cmds.push(
if (this._isCounterEnabled(action)) { ['incr', generateCounter(p, 'numberOfObjectsCounter')],
cmds.push(['incr', generateKey(p, action, timestamp)]); ['incr', generateKey(p, action, timestamp)]
} );
}); });
// We track the number of commands needed for each `paramsArr` object to // We track the number of commands needed for each `paramsArr` object to
// eventually locate each group in the results from Redis. // eventually locate each group in the results from Redis.
const commandsGroupSize = (cmds.length / paramsArr.length); const commandsGroupSize = 2;
return this.ds.batch(cmds, (err, results) => { return this.ds.batch(cmds, (err, results) => {
if (err) { if (err) {
log.error('error incrementing counter for push metric', { log.error('error incrementing counter for push metric', {
@ -827,8 +660,8 @@ class UtapiClient {
actionCounter = actionCounter < 0 ? 1 : actionCounter; actionCounter = actionCounter < 0 ? 1 : actionCounter;
if (actionErr) { if (actionErr) {
log.error('error incrementing counter for push metric', { log.error('error incrementing counter for push metric', {
method: 'UtapiClient._pushMetricCompleteMultipart' method: 'UtapiClient._pushMetricCompleteMultipart' +
+ 'Upload', 'Upload',
metric: 'number of objects', metric: 'number of objects',
error: actionErr, error: actionErr,
}); });
@ -838,7 +671,7 @@ class UtapiClient {
} }
key = generateStateKey(p, 'numberOfObjects'); key = generateStateKey(p, 'numberOfObjects');
cmds2.push(['zremrangebyscore', key, timestamp, timestamp], cmds2.push(['zremrangebyscore', key, timestamp, timestamp],
['zadd', key, timestamp, member.serialize(actionCounter)]); ['zadd', key, timestamp, actionCounter]);
return true; return true;
}); });
if (noErr) { if (noErr) {
@ -880,8 +713,8 @@ class UtapiClient {
* @return {undefined} * @return {undefined}
*/ */
_genericPushMetricDeleteObject(params, timestamp, action, log, callback) { _genericPushMetricDeleteObject(params, timestamp, action, log, callback) {
const expectedProps = action === 'abortMultipartUpload' const expectedProps = action === 'abortMultipartUpload' ?
? ['byteLength'] : ['byteLength', 'numberOfObjects']; ['byteLength'] : ['byteLength', 'numberOfObjects'];
this._checkProperties(params, expectedProps); this._checkProperties(params, expectedProps);
const { byteLength, numberOfObjects } = params; const { byteLength, numberOfObjects } = params;
this._logMetric(params, '_genericPushMetricDeleteObject', timestamp, this._logMetric(params, '_genericPushMetricDeleteObject', timestamp,
@ -894,10 +727,8 @@ class UtapiClient {
cmds.push( cmds.push(
['decrby', generateCounter(p, 'storageUtilizedCounter'), ['decrby', generateCounter(p, 'storageUtilizedCounter'),
byteLength], byteLength],
['incr', generateKey(p, action, timestamp)]
); );
if (this._isCounterEnabled(action)) {
cmds.push(['incr', generateKey(p, action, timestamp)]);
}
// The 'abortMultipartUpload' action affects only storage utilized, // The 'abortMultipartUpload' action affects only storage utilized,
// so number of objects remains unchanged. // so number of objects remains unchanged.
if (action !== 'abortMultipartUpload') { if (action !== 'abortMultipartUpload') {
@ -907,7 +738,7 @@ class UtapiClient {
}); });
// We track the number of commands needed for each `paramsArr` object to // We track the number of commands needed for each `paramsArr` object to
// eventually locate each group in the results from Redis. // eventually locate each group in the results from Redis.
const commandsGroupSize = (cmds.length / paramsArr.length); const commandsGroupSize = action !== 'abortMultipartUpload' ? 3 : 2;
return this.ds.batch(cmds, (err, results) => { return this.ds.batch(cmds, (err, results) => {
if (err) { if (err) {
log.error('error incrementing counter', { log.error('error incrementing counter', {
@ -952,16 +783,15 @@ class UtapiClient {
timestamp, timestamp], timestamp, timestamp],
['zadd', ['zadd',
generateStateKey(p, 'storageUtilized'), timestamp, generateStateKey(p, 'storageUtilized'), timestamp,
member.serialize(actionCounter)], actionCounter]);
);
// The 'abortMultipartUpload' action does not affect number of // The 'abortMultipartUpload' action does not affect number of
// objects, so we return here. // objects, so we return here.
if (action === 'abortMultipartUpload') { if (action === 'abortMultipartUpload') {
return true; return true;
} }
// The number of objects counter result is the last element of // The number of objects counter result is the third element of
// each group of commands. // each group of commands. Thus we add two.
const numberOfObjectsResult = currentResultsGroup + (commandsGroupSize - 1); const numberOfObjectsResult = currentResultsGroup + 2;
actionErr = results[numberOfObjectsResult][0]; actionErr = results[numberOfObjectsResult][0];
actionCounter = parseInt(results[numberOfObjectsResult][1], 10); actionCounter = parseInt(results[numberOfObjectsResult][1], 10);
// If < 0, record numberOfObjects as though bucket were empty. // If < 0, record numberOfObjects as though bucket were empty.
@ -983,8 +813,7 @@ class UtapiClient {
generateStateKey(p, 'numberOfObjects'), timestamp, generateStateKey(p, 'numberOfObjects'), timestamp,
timestamp], timestamp],
['zadd', generateStateKey(p, 'numberOfObjects'), timestamp, ['zadd', generateStateKey(p, 'numberOfObjects'), timestamp,
member.serialize(actionCounter)], actionCounter]);
);
return true; return true;
}); });
if (noErr) { if (noErr) {
@ -1016,10 +845,8 @@ class UtapiClient {
cmds.push( cmds.push(
['incrby', generateKey(p, 'outgoingBytes', timestamp), ['incrby', generateKey(p, 'outgoingBytes', timestamp),
newByteLength], newByteLength],
['incr', generateKey(p, action, timestamp)]
); );
if (this._isCounterEnabled(action)) {
cmds.push(['incr', generateKey(p, action, timestamp)]);
}
}); });
// update counters // update counters
return this.ds.batch(cmds, err => { return this.ds.batch(cmds, err => {
@ -1066,14 +893,12 @@ class UtapiClient {
['incrby', generateCounter(p, 'storageUtilizedCounter'), ['incrby', generateCounter(p, 'storageUtilizedCounter'),
storageUtilizedDelta], storageUtilizedDelta],
[redisCmd, generateCounter(p, 'numberOfObjectsCounter')], [redisCmd, generateCounter(p, 'numberOfObjectsCounter')],
['incr', generateKey(p, action, timestamp)]
); );
if (this._isCounterEnabled(action)) { if (action === 'putObject') {
cmds.push(['incr', generateKey(p, action, timestamp)]);
}
if (action === 'putObject' || action === 'replicateObject') {
cmds.push( cmds.push(
['incrby', generateKey(p, 'incomingBytes', timestamp), ['incrby', generateKey(p, 'incomingBytes', timestamp),
newByteLength], newByteLength]
); );
} }
}); });
@ -1101,8 +926,8 @@ class UtapiClient {
actionErr = results[storageIndex][0]; actionErr = results[storageIndex][0];
actionCounter = parseInt(results[storageIndex][1], 10); actionCounter = parseInt(results[storageIndex][1], 10);
// If < 0, record storageUtilized as though bucket were empty. // If < 0, record storageUtilized as though bucket were empty.
actionCounter = actionCounter < 0 ? storageUtilizedDelta actionCounter = actionCounter < 0 ? storageUtilizedDelta :
: actionCounter; actionCounter;
if (actionErr) { if (actionErr) {
log.error('error incrementing counter for push metric', { log.error('error incrementing counter for push metric', {
method: 'UtapiClient._genericPushMetricPutObject', method: 'UtapiClient._genericPushMetricPutObject',
@ -1118,8 +943,7 @@ class UtapiClient {
generateStateKey(p, 'storageUtilized'), generateStateKey(p, 'storageUtilized'),
timestamp, timestamp], timestamp, timestamp],
['zadd', generateStateKey(p, 'storageUtilized'), ['zadd', generateStateKey(p, 'storageUtilized'),
timestamp, member.serialize(actionCounter)], timestamp, actionCounter]);
);
// number of objects counter // number of objects counter
objectsIndex = (i * (cmdsLen / paramsArrLen)) + 1; objectsIndex = (i * (cmdsLen / paramsArrLen)) + 1;
@ -1128,8 +952,8 @@ class UtapiClient {
// If the key does not exist, actionCounter will be null. // If the key does not exist, actionCounter will be null.
// Hence we check that action counter is a number and is > 0. If // Hence we check that action counter is a number and is > 0. If
// true, we record numberOfObjects as though bucket were empty. // true, we record numberOfObjects as though bucket were empty.
actionCounter = Number.isNaN(actionCounter) actionCounter = Number.isNaN(actionCounter) ||
|| actionCounter < 0 ? 1 : actionCounter; actionCounter < 0 ? 1 : actionCounter;
if (actionErr) { if (actionErr) {
log.error('error incrementing counter for push metric', { log.error('error incrementing counter for push metric', {
method: 'UtapiClient._genericPushMetricPutObject', method: 'UtapiClient._genericPushMetricPutObject',
@ -1145,8 +969,7 @@ class UtapiClient {
generateStateKey(p, 'numberOfObjects'), generateStateKey(p, 'numberOfObjects'),
timestamp, timestamp], timestamp, timestamp],
['zadd', generateStateKey(p, 'numberOfObjects'), ['zadd', generateStateKey(p, 'numberOfObjects'),
timestamp, member.serialize(actionCounter)], timestamp, actionCounter]);
);
return true; return true;
}); });
if (noErr) { if (noErr) {

View File

@ -1,265 +0,0 @@
const childProcess = require('child_process');
const async = require('async');
const nodeSchedule = require('node-schedule');
const { jsutil } = require('arsenal');
const werelogs = require('werelogs');
const Datastore = require('./Datastore');
const RedisClient = require('../libV2/redis');
const REINDEX_SCHEDULE = '0 0 * * Sun';
const REINDEX_LOCK_KEY = 's3:utapireindex:lock';
const REINDEX_LOCK_TTL = (60 * 60) * 24;
const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== undefined
? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex {
constructor(config) {
this._enabled = false;
this._schedule = REINDEX_SCHEDULE;
this._redis = {
name: 'scality-s3',
sentinelPassword: '',
sentinels: [{
host: '127.0.0.1',
port: 16379,
}],
};
this._bucketd = {
host: '127.0.0.1',
port: 9000,
};
this._password = '';
this._log = new werelogs.Logger('UtapiReindex');
if (config && config.enabled) {
this._enabled = config.enabled;
}
if (config && config.schedule) {
this._schedule = config.schedule;
}
if (config && config.password) {
this._password = config.password;
}
if (config && config.redis) {
const {
name, sentinelPassword, sentinels,
} = config.redis;
this._redis.name = name || this._redis.name;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword;
this._redis.sentinels = sentinels || this._redis.sentinels;
}
if (config && config.bucketd) {
const { host, port } = config.bucketd;
this._bucketd.host = host || this._bucketd.host;
this._bucketd.port = port || this._bucketd.port;
}
if (config && config.log) {
const { level, dump } = config.log;
this._log = new werelogs.Logger('UtapiReindex', { level, dump });
}
this._onlyCountLatestWhenObjectLocked = (config && config.onlyCountLatestWhenObjectLocked === true);
this._requestLogger = this._log.newRequestLogger();
}
_getRedisClient() {
const client = new RedisClient({
sentinels: this._redis.sentinels,
name: this._redis.name,
sentinelPassword: this._redis.sentinelPassword,
password: this._password,
});
client.connect();
return client;
}
_lock() {
return this.ds.setExpire(REINDEX_LOCK_KEY, 'true', REINDEX_LOCK_TTL);
}
_unLock() {
return this.ds.del(REINDEX_LOCK_KEY);
}
_buildFlags(sentinel) {
const flags = {
/* eslint-disable camelcase */
sentinel_ip: sentinel.host,
sentinel_port: sentinel.port,
sentinel_cluster_name: this._redis.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
};
if (this._redis.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword;
}
/* eslint-enable camelcase */
const opts = [];
Object.keys(flags)
.forEach(flag => {
const name = `--${flag.replace(/_/g, '-')}`;
opts.push(name);
opts.push(flags[flag]);
});
if (this._onlyCountLatestWhenObjectLocked) {
opts.push('--only-latest-when-locked');
}
return opts;
}
_runScriptWithSentinels(path, remainingSentinels, done) {
const flags = this._buildFlags(remainingSentinels.shift());
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
process.stdout.on('data', data => {
this._requestLogger.info('received output from script', {
output: Buffer.from(data).toString(),
script: path,
});
});
process.stderr.on('data', data => {
this._requestLogger.debug('received error from script', {
output: Buffer.from(data).toString(),
script: path,
});
});
process.on('error', err => {
this._requestLogger.debug('failed to start process', {
error: err,
script: path,
});
});
process.on('close', code => {
if (code) {
this._requestLogger.error('script exited with error', {
statusCode: code,
script: path,
});
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else {
this._requestLogger.info('script exited successfully', {
statusCode: code,
script: path,
});
}
return done();
});
}
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock()
.then(res => {
if (res) {
this._requestLogger
.info('acquired the lock, proceeding with job');
job();
} else {
this._requestLogger
.info('the lock is already acquired, skipping job');
}
})
.catch(err => {
this._requestLogger.error(
'an error occurred when acquiring the lock, skipping job', {
stack: err && err.stack,
},
);
});
}
_attemptUnlock() {
this._unLock()
.catch(err => {
this._requestLogger
.error('an error occurred when removing the lock', {
stack: err && err.stack,
});
});
}
_connect(done) {
const doneOnce = jsutil.once(done);
const client = this._getRedisClient();
this.ds = new Datastore().setClient(client);
client
.on('ready', doneOnce)
.on('error', doneOnce);
}
_scheduleJob() {
this._connect(err => {
if (err) {
this._requestLogger.error(
'could not connect to datastore, skipping', {
error: err && err.stack,
},
);
return undefined;
}
return this._attemptLock(() => {
const scripts = [
`${__dirname}/reindex/s3_bucketd.py`,
`${__dirname}/reindex/reporting.py`,
];
return async.eachSeries(scripts, (script, next) => {
this._runScript(script, next);
}, () => {
this._attemptUnlock();
});
});
});
}
_job() {
const job = nodeSchedule.scheduleJob(this._schedule, () => this._scheduleJob());
if (!job) {
this._log.error('could not initiate job schedule');
return undefined;
}
job.on('scheduled', () => {
this._requestLogger = this._log.newRequestLogger();
this._requestLogger.info('utapi reindex job scheduled', {
schedule: this._schedule,
});
});
return undefined;
}
start() {
if (this._enabled) {
this._log.info('initiating job schedule', {
schedule: this._schedule,
});
this._job();
} else {
this._log.info('utapi reindex is disabled');
}
return this;
}
}
module.exports = UtapiReindex;

View File

@ -1,12 +1,11 @@
/* eslint-disable no-plusplus */
const assert = require('assert'); const assert = require('assert');
const async = require('async'); const async = require('async');
const { scheduleJob } = require('node-schedule'); const { scheduleJob } = require('node-schedule');
const werelogs = require('werelogs');
const UtapiClient = require('./UtapiClient'); const UtapiClient = require('./UtapiClient');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const redisClient = require('../utils/redisClient');
const safeJsonParse = require('../utils/safeJsonParse'); const safeJsonParse = require('../utils/safeJsonParse');
const redisClientv2 = require('../utils/redisClientv2'); const werelogs = require('werelogs');
// Every five minutes. Cron-style scheduling used by node-schedule. // Every five minutes. Cron-style scheduling used by node-schedule.
const REPLAY_SCHEDULE = '*/5 * * * *'; const REPLAY_SCHEDULE = '*/5 * * * *';
@ -36,13 +35,13 @@ class UtapiReplay {
this.disableReplay = true; this.disableReplay = true;
if (config) { if (config) {
const message = 'missing required property in UtapiReplay ' const message = 'missing required property in UtapiReplay ' +
+ 'configuration'; 'configuration';
assert(config.redis, `${message}: redis`); assert(config.redis, `${message}: redis`);
assert(config.localCache, `${message}: localCache`); assert(config.localCache, `${message}: localCache`);
this.utapiClient = new UtapiClient(config); this.utapiClient = new UtapiClient(config);
this.localCache = new Datastore() this.localCache = new Datastore()
.setClient(redisClientv2(config.localCache, this.log)); .setClient(redisClient(config.localCache, this.log));
if (config.replaySchedule) { if (config.replaySchedule) {
this.replaySchedule = config.replaySchedule; this.replaySchedule = config.replaySchedule;
} }
@ -76,9 +75,7 @@ class UtapiReplay {
* @return {boolean} Returns `true` if object is valid, `false` otherwise. * @return {boolean} Returns `true` if object is valid, `false` otherwise.
*/ */
_validateElement(data) { _validateElement(data) {
const { const { action, reqUid, params, timestamp } = data;
action, reqUid, params, timestamp,
} = data;
if (!action || !reqUid || !params || !timestamp) { if (!action || !reqUid || !params || !timestamp) {
this.log.fatal('missing required parameter in element', this.log.fatal('missing required parameter in element',
{ method: 'UtapiReplay._validateElement' }); { method: 'UtapiReplay._validateElement' });
@ -187,7 +184,8 @@ class UtapiReplay {
this.log.info('disabled utapi replay scheduler'); this.log.info('disabled utapi replay scheduler');
return this; return this;
} }
const replay = scheduleJob(this.replaySchedule, () => this._setLock() const replay = scheduleJob(this.replaySchedule, () =>
this._setLock()
.then(res => { .then(res => {
// If `res` is not `null`, there is no pre-existing lock. // If `res` is not `null`, there is no pre-existing lock.
if (res) { if (res) {
@ -195,7 +193,8 @@ class UtapiReplay {
} }
return undefined; return undefined;
})); }));
replay.on('scheduled', date => this.log.info(`replay job started: ${date}`)); replay.on('scheduled', date =>
this.log.info(`replay job started: ${date}`));
this.log.info('enabled utapi replay scheduler', { this.log.info('enabled utapi replay scheduler', {
schedule: this.replaySchedule, schedule: this.replaySchedule,
}); });

View File

@ -4,6 +4,7 @@
* @class * @class
*/ */
class UtapiRequest { class UtapiRequest {
constructor() { constructor() {
this._log = null; this._log = null;
this._validator = null; this._validator = null;
@ -14,15 +15,6 @@ class UtapiRequest {
this._datastore = null; this._datastore = null;
this._requestQuery = null; this._requestQuery = null;
this._requestPath = null; this._requestPath = null;
this._vault = null;
}
getVault() {
return this._vault;
}
setVault() {
return this._vault;
} }
/** /**
@ -275,6 +267,7 @@ class UtapiRequest {
getDatastore() { getDatastore() {
return this._datastore; return this._datastore;
} }
} }
module.exports = UtapiRequest; module.exports = UtapiRequest;

View File

@ -6,6 +6,7 @@ const vaultclient = require('vaultclient');
*/ */
class Vault { class Vault {
constructor(config) { constructor(config) {
const { host, port } = config.vaultd; const { host, port } = config.vaultd;
if (config.https) { if (config.https) {
@ -41,11 +42,10 @@ class Vault {
* @return {undefined} * @return {undefined}
*/ */
authenticateV4Request(params, requestContexts, callback) { authenticateV4Request(params, requestContexts, callback) {
const { const { accessKey, signatureFromRequest, region, scopeDate,
accessKey, signatureFromRequest, region, scopeDate, stringToSign }
stringToSign, = params.data;
} = params.data; const log = params.log;
const { log } = params;
log.debug('authenticating V4 request'); log.debug('authenticating V4 request');
const serializedRCs = requestContexts.map(rc => rc.serialize()); const serializedRCs = requestContexts.map(rc => rc.serialize());
this._client.verifySignatureV4( this._client.verifySignatureV4(
@ -59,8 +59,7 @@ class Vault {
} }
return callback(null, return callback(null,
authInfo.message.body.authorizationResults); authInfo.message.body.authorizationResults);
}, });
);
} }
/** /**
@ -77,6 +76,7 @@ class Vault {
return this._client.getCanonicalIdsByAccountIds(accountIds, return this._client.getCanonicalIdsByAccountIds(accountIds,
{ reqUid: log.getSerializedUids(), logger: log }, callback); { reqUid: log.getSerializedUids(), logger: log }, callback);
} }
} }
module.exports = Vault; module.exports = Vault;

View File

@ -1,4 +1,3 @@
/* eslint-disable class-methods-use-this */
const assert = require('assert'); const assert = require('assert');
const map = require('async/map'); const map = require('async/map');
@ -53,16 +52,6 @@ class Memory {
this.data = {}; this.data = {};
} }
/**
* A simple wrapper provided for API compatibility with redis
* @param {Function} func - Function to call
* @param {callback} cb - callback
* @returns {undefined}
*/
call(func, cb) {
return func(this, cb);
}
/** /**
* Set key to hold a value * Set key to hold a value
* @param {string} key - data key * @param {string} key - data key
@ -88,8 +77,8 @@ class Memory {
*/ */
get(key, cb) { get(key, cb) {
assert.strictEqual(typeof key, 'string'); assert.strictEqual(typeof key, 'string');
process.nextTick(() => cb(null, this.data[key] === undefined process.nextTick(() => cb(null, this.data[key] === undefined ?
? null : this.data[key])); null : this.data[key]));
} }
/** /**
@ -106,8 +95,8 @@ class Memory {
} }
const val = parseInt(this.data[key], 10); const val = parseInt(this.data[key], 10);
if (Number.isNaN(val)) { if (Number.isNaN(val)) {
throw new Error('Value at key cannot be represented as a ' throw new Error('Value at key cannot be represented as a ' +
+ 'number'); 'number');
} }
this.data[key] = (val + 1).toString(); this.data[key] = (val + 1).toString();
return cb(null, this.data[key]); return cb(null, this.data[key]);
@ -130,8 +119,8 @@ class Memory {
} }
const val = parseInt(this.data[key], 10); const val = parseInt(this.data[key], 10);
if (Number.isNaN(val)) { if (Number.isNaN(val)) {
throw new Error('Value at key cannot be represented as a ' throw new Error('Value at key cannot be represented as a ' +
+ 'number'); 'number');
} }
this.data[key] = (val + num).toString(); this.data[key] = (val + num).toString();
return cb(null, this.data[key]); return cb(null, this.data[key]);
@ -152,8 +141,8 @@ class Memory {
} }
const val = parseInt(this.data[key], 10); const val = parseInt(this.data[key], 10);
if (Number.isNaN(val)) { if (Number.isNaN(val)) {
throw new Error('Value at key cannot be represented as a ' throw new Error('Value at key cannot be represented as a ' +
+ 'number'); 'number');
} }
this.data[key] = (val - 1).toString(); this.data[key] = (val - 1).toString();
return cb(null, this.data[key]); return cb(null, this.data[key]);
@ -176,8 +165,8 @@ class Memory {
} }
const val = parseInt(this.data[key], 10); const val = parseInt(this.data[key], 10);
if (Number.isNaN(val)) { if (Number.isNaN(val)) {
throw new Error('Value at key cannot be represented as a ' throw new Error('Value at key cannot be represented as a ' +
+ 'number'); 'number');
} }
this.data[key] = (val - num).toString(); this.data[key] = (val - num).toString();
return cb(null, this.data[key]); return cb(null, this.data[key]);
@ -204,7 +193,8 @@ class Memory {
} }
const valStr = value.toString(); const valStr = value.toString();
// compares both arrays of data // compares both arrays of data
const found = this.data[key].some(item => JSON.stringify(item) === JSON.stringify([score, valStr])); const found = this.data[key].some(item =>
JSON.stringify(item) === JSON.stringify([score, valStr]));
if (!found) { if (!found) {
// as this is a sorted set emulation, it sorts the data by score // as this is a sorted set emulation, it sorts the data by score
// after each insertion // after each insertion
@ -237,8 +227,8 @@ class Memory {
return cb(null, null); return cb(null, null);
} }
const minScore = (min === '-inf') ? this.data[key][0][0] : min; const minScore = (min === '-inf') ? this.data[key][0][0] : min;
const maxScore = (min === '+inf') const maxScore = (min === '+inf') ?
? this.data[key][this.data[key].length - 1][0] : max; this.data[key][this.data[key].length - 1][0] : max;
return cb(null, this.data[key].filter(item => item[0] >= minScore return cb(null, this.data[key].filter(item => item[0] >= minScore
&& item[0] <= maxScore).map(item => item[1])); && item[0] <= maxScore).map(item => item[1]));
}); });
@ -265,8 +255,8 @@ class Memory {
return cb(null, null); return cb(null, null);
} }
const minScore = (min === '-inf') ? this.data[key][0][0] : min; const minScore = (min === '-inf') ? this.data[key][0][0] : min;
const maxScore = (min === '+inf') const maxScore = (min === '+inf') ?
? this.data[key][this.data[key].length][0] : max; this.data[key][this.data[key].length][0] : max;
const cloneKeyData = Object.assign(this.data[key]); const cloneKeyData = Object.assign(this.data[key]);
// Sort keys by scores in the decreasing order, if scores are equal // Sort keys by scores in the decreasing order, if scores are equal
// sort by their value in the decreasing order // sort by their value in the decreasing order
@ -302,10 +292,11 @@ class Memory {
return cb(null, null); return cb(null, null);
} }
const minScore = (min === '-inf') ? this.data[key][0][0] : min; const minScore = (min === '-inf') ? this.data[key][0][0] : min;
const maxScore = (min === '+inf') const maxScore = (min === '+inf') ?
? this.data[key][this.data[key].length][0] : max; this.data[key][this.data[key].length][0] : max;
const oldLen = this.data[key].length; const oldLen = this.data[key].length;
this.data[key] = this.data[key].filter(item => (item[0] < minScore || item[0] > maxScore)); this.data[key] = this.data[key].filter(item =>
(item[0] < minScore || item[0] > maxScore));
return cb(null, (oldLen - this.data[key].length)); return cb(null, (oldLen - this.data[key].length));
}); });
} }

View File

@ -1,117 +0,0 @@
import argparse
import ast
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests
import sys
from threading import Thread
import time
import urllib
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex:reporting')
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
parser.add_argument("-p", "--sentinel-port", default="16379", help="Sentinel Port")
parser.add_argument("-v", "--redis-password", default=None, help="Redis AUTH Password")
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-b", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
return parser.parse_args()
def safe_print(content):
print("{0}".format(content))
class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password
r = redis.Redis(
host=ip,
port=port,
db=0,
password=password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
res = 's3:%s:%s:storageUtilized:counter' % (resource, name)
total_size = r.get(res)
res = 's3:%s:%s:numberOfObjects:counter' % (resource, name)
files = r.get(res)
try:
return {'files': int(files), "total_size": int(total_size)}
except Exception as e:
return {'files': 0, "total_size": 0}
class S3ListBuckets():
def __init__(self, host='127.0.0.1:9000'):
self.bucketd_host = host
def run(self):
docs = []
url = "%s/default/bucket/users..bucket" % self.bucketd_host
session = requests.Session()
r = session.get(url, timeout=30)
if r.status_code == 200:
payload = json.loads(r.text)
for keys in payload['Contents']:
key = keys["key"]
r1 = re.match("(\w+)..\|..(\w+.*)", key)
docs.append(r1.groups())
return docs
return(self.userid, self.bucket, user, files, total_size)
if __name__ == '__main__':
options = get_options()
redis_conf = dict(
ip=options.sentinel_ip,
port=options.sentinel_port,
sentinel_cluster_name=options.sentinel_cluster_name,
password=options.redis_password
)
P = S3ListBuckets(options.bucketd_addr)
listbuckets = P.run()
userids = set([x for x, y in listbuckets])
executor = ThreadPoolExecutor(max_workers=1)
for userid, bucket in listbuckets:
U = askRedis(**redis_conf)
data = U.read('buckets', bucket)
content = "Account:%s|Bucket:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
userid, bucket, data["files"], data["total_size"])
executor.submit(safe_print, content)
data = U.read('buckets', 'mpuShadowBucket'+bucket)
content = "Account:%s|Bucket:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
userid, 'mpuShadowBucket'+bucket, data["files"], data["total_size"])
executor.submit(safe_print, content)
executor.submit(safe_print, "")
for userid in sorted(userids):
U = askRedis(**redis_conf)
data = U.read('accounts', userid)
content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
userid, data["files"], data["total_size"])
executor.submit(safe_print, content)

View File

@ -1,586 +0,0 @@
import argparse
import concurrent.futures as futures
import functools
import itertools
import json
import logging
import os
import re
import sys
import time
import urllib
from pathlib import Path
from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor
import redis
import requests
from requests import ConnectionError, HTTPError, Timeout
logging.basicConfig(level=logging.INFO)
_log = logging.getLogger('utapi-reindex')
USERS_BUCKET = 'users..bucket'
MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options():
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
parser.add_argument("-p", "--sentinel-port", default="16379", help="Sentinel Port")
parser.add_argument("-v", "--redis-password", default=None, help="Redis AUTH Password")
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging")
parser.add_argument("--dry-run", action="store_true", help="Do not update redis")
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]
return options
def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path
def chunks(iterable, size):
it = iter(iterable)
chunk = tuple(itertools.islice(it,size))
while chunk:
yield chunk
chunk = tuple(itertools.islice(it,size))
def _encoded(func):
def inner(*args, **kwargs):
val = func(*args, **kwargs)
return urllib.parse.quote(val.encode('utf-8'))
return inner
Bucket = namedtuple('Bucket', ['userid', 'name', 'object_lock_enabled'])
MPU = namedtuple('MPU', ['bucket', 'key', 'upload_id'])
BucketContents = namedtuple('BucketContents', ['bucket', 'obj_count', 'total_size'])
class MaxRetriesReached(Exception):
def __init__(self, url):
super().__init__('Max retries reached for request to %s'%url)
class InvalidListing(Exception):
def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket)
class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)
class BucketDClient:
'''Performs Listing calls against bucketd'''
__url_attribute_format = '{addr}/default/attributes/{bucket}'
__url_bucket_format = '{addr}/default/bucket/{bucket}'
__headers = {"x-scal-request-uids": "utapi-reindex-list-buckets"}
def __init__(self, bucketd_addr=None, max_retries=2, only_latest_when_locked=False):
self._bucketd_addr = bucketd_addr
self._max_retries = max_retries
self._only_latest_when_locked = only_latest_when_locked
self._session = requests.Session()
def _do_req(self, url, check_500=True, **kwargs):
# Add 1 for the initial request
for x in range(self._max_retries + 1):
try:
resp = self._session.get(url, timeout=30, verify=False, headers=self.__headers, **kwargs)
if check_500 and resp.status_code == 500:
_log.warning('500 from bucketd, sleeping 15 secs')
time.sleep(15)
continue
return resp
except (Timeout, ConnectionError) as e:
_log.exception(e)
_log.error('Error during listing, sleeping 5 secs %s'%url)
time.sleep(5)
raise MaxRetriesReached(url)
def _list_bucket(self, bucket, **kwargs):
'''
Lists a bucket lazily until "empty"
bucket: name of the bucket
kwargs: url parameters key=value
To support multiple next marker keys and param encoding, a function can
be passed as a parameters value. It will be call with the json decode
response body as its only argument and is expected to return the
parameters value. On the first request the function will be called with
`None` and should return its initial value. Return `None` for the param to be excluded.
'''
url = self.__url_bucket_format.format(addr=self._bucketd_addr, bucket=bucket)
static_params = {k: v for k, v in kwargs.items() if not callable(v)}
dynamic_params = {k: v for k, v in kwargs.items() if callable(v)}
is_truncated = True # Set to True for first loop
payload = None
while is_truncated:
params = static_params.copy() # Use a copy of the static params for a base
for key, func in dynamic_params.items():
params[key] = func(payload) # Call each of our dynamic params with the previous payload
try:
_log.debug('listing bucket bucket: %s params: %s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
resp = self._do_req(url, params=params)
if resp.status_code == 404:
_log.debug('Bucket not found bucket: %s'%bucket)
return
if resp.status_code == 200:
payload = resp.json()
except ValueError as e:
_log.exception(e)
_log.error('Invalid listing response body! bucket:%s params:%s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
continue
except MaxRetriesReached:
_log.error('Max retries reached listing bucket:%s'%bucket)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception during listing! bucket:%s params:%s'%(
bucket, ', '.join('%s=%s'%p for p in params.items())))
raise
yield resp.status_code, payload
if isinstance(payload, dict):
is_truncated = payload.get('IsTruncated', False)
else:
is_truncated = len(payload) > 0
@functools.lru_cache(maxsize=16)
def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try:
resp = self._do_req(url)
if resp.status_code == 200:
return resp.json()
else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise BucketNotFound(name)
except ValueError as e:
_log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name)
raise
except MaxRetriesReached:
_log.error('Max retries reached getting bucket attributes bucket:%s'%name)
raise
except Exception as e:
_log.exception(e)
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise
def get_bucket_md(self, name):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))
def list_buckets(self, account=None):
def get_next_marker(p):
if p is None:
return ''
return p.get('Contents', [{}])[-1].get('key', '')
params = {
'delimiter': '',
'maxKeys': 1000,
'marker': get_next_marker
}
if account is not None:
params['prefix'] = '%s..|..' % account
for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = []
for result in payload.get('Contents', []):
match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False)
# We need to get the attributes for each bucket to determine if it is locked
if self._only_latest_when_locked:
bucket_attrs = self._get_bucket_attributes(bucket.name)
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)
if buckets:
yield buckets
def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
def get_next_marker(p):
if p is None:
return 'overview..|..'
return p.get('NextKeyMarker', '')
def get_next_upload_id(p):
if p is None:
return 'None'
return p.get('NextUploadIdMarker', '')
params = {
'delimiter': '',
'keyMarker': '',
'maxKeys': 1000,
'queryPrefixLength': 0,
'listingType': 'MPU',
'splitter': '..|..',
'prefix': get_next_marker,
'uploadIdMarker': get_next_upload_id,
}
keys = []
for status_code, payload in self._list_bucket(_bucket, **params):
if status_code == 404:
break
for key in payload['Uploads']:
keys.append(MPU(
bucket=bucket,
key=key['key'],
upload_id=key['value']['UploadId']))
return keys
def _sum_objects(self, bucket, listing, only_latest_when_locked = False):
count = 0
total_size = 0
last_key = None
try:
for obj in listing:
if isinstance(obj['value'], dict):
# bucketd v6 returns a dict:
data = obj.get('value', {})
size = data["Size"]
else:
# bucketd v7 returns an encoded string
data = json.loads(obj['value'])
size = data.get('content-length', 0)
is_latest = obj['key'] != last_key
last_key = obj['key']
if only_latest_when_locked and bucket.object_lock_enabled and not is_latest:
_log.debug('Skipping versioned key: %s'%obj['key'])
continue
count += 1
total_size += size
except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
return count, total_size
def _extract_listing(self, key, listing):
for status_code, payload in listing:
contents = payload[key] if isinstance(payload, dict) else payload
if contents is None:
raise InvalidListing('')
for obj in contents:
yield obj
def count_bucket_contents(self, bucket):
def get_key_marker(p):
if p is None:
return ''
return p.get('NextKeyMarker', '')
def get_vid_marker(p):
if p is None:
return ''
return p.get('NextVersionIdMarker', '')
params = {
'listingType': 'DelimiterVersions',
'maxKeys': 1000,
'keyMarker': get_key_marker,
'versionIdMarker': get_vid_marker,
}
listing = self._list_bucket(bucket.name, **params)
count, total_size = self._sum_objects(bucket, self._extract_listing('Versions', listing), self._only_latest_when_locked)
return BucketContents(
bucket=bucket,
obj_count=count,
total_size=total_size
)
def count_mpu_parts(self, mpu):
shadow_bucket_name = MPU_SHADOW_BUCKET_PREFIX + mpu.bucket.name
shadow_bucket = mpu.bucket._replace(name=shadow_bucket_name)
def get_prefix(p):
if p is None:
return mpu.upload_id
return p.get('Contents', [{}])[-1].get('key', '')
@_encoded
def get_next_marker(p):
prefix = get_prefix(p)
return prefix + '..|..00000'
params = {
'prefix': get_prefix,
'marker': get_next_marker,
'delimiter': '',
'maxKeys': 1000,
'listingType': 'Delimiter',
}
listing = self._list_bucket(shadow_bucket_name, **params)
count, total_size = self._sum_objects(shadow_bucket, self._extract_listing('Contents', listing))
return BucketContents(
bucket=shadow_bucket,
obj_count=0, # MPU parts are not counted towards numberOfObjects
total_size=total_size
)
def list_all_buckets(bucket_client):
return bucket_client.list_buckets()
def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)
def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue
yield batch
def index_bucket(client, bucket):
'''
Takes an instance of BucketDClient and a bucket name, and returns a
tuple of BucketContents for the passed bucket and its mpu shadow bucket.
'''
try:
bucket_total = client.count_bucket_contents(bucket)
mpus = client.list_mpus(bucket)
if not mpus:
return bucket_total
total_size = bucket_total.total_size
mpu_totals = [client.count_mpu_parts(m) for m in mpus]
for mpu in mpu_totals:
total_size += mpu.total_size
return bucket_total._replace(total_size=total_size)
except Exception as e:
_log.exception(e)
_log.error('Error during listing. Removing from results bucket:%s'%bucket.name)
raise InvalidListing(bucket.name)
def update_report(report, key, obj_count, total_size):
'''Convenience function to update the report dicts'''
if key in report:
report[key]['obj_count'] += obj_count
report[key]['total_size'] += total_size
else:
report[key] = {
'obj_count': obj_count,
'total_size': total_size,
}
def get_redis_client(options):
sentinel = redis.Redis(
host=options.sentinel_ip,
port=options.sentinel_port,
db=0,
password=options.redis_password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
return redis.Redis(
host=ip,
port=port,
db=0,
password=options.redis_password
)
def update_redis(client, resource, name, obj_count, total_size):
timestamp = int(time.time() - 15 * 60) * 1000
obj_count_key = 's3:%s:%s:numberOfObjects' % (resource, name)
total_size_key = 's3:%s:%s:storageUtilized' % (resource, name)
client.zremrangebyscore(obj_count_key, timestamp, timestamp)
client.zremrangebyscore(total_size_key, timestamp, timestamp)
client.zadd(obj_count_key, {obj_count: timestamp})
client.zadd(total_size_key, {total_size: timestamp})
client.set(obj_count_key + ':counter', obj_count)
client.set(total_size_key + ':counter', total_size)
def get_resources_from_redis(client, resource):
for key in redis_client.scan_iter('s3:%s:*:storageUtilized' % resource):
yield key.decode('utf-8').split(':')[2]
def log_report(resource, name, obj_count, total_size):
print('%s:%s:%s:%s'%(
resource,
name,
obj_count,
total_size
))
if __name__ == '__main__':
options = get_options()
if options.debug:
_log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
redis_client = get_redis_client(options)
account_reports = {}
observed_buckets = set()
failed_accounts = set()
if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)
with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in batch_generator:
bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
for job in futures.as_completed(jobs.keys()):
try:
total = job.result() # Summed bucket and shadowbucket totals
except InvalidListing:
_bucket = jobs[job]
_log.error('Failed to list bucket %s. Removing from results.'%_bucket.name)
# Add the bucket to observed_buckets anyway to avoid clearing existing metrics
observed_buckets.add(_bucket.name)
# If we can not list one of an account's buckets we can not update its total
failed_accounts.add(_bucket.userid)
continue
observed_buckets.add(total.bucket.name)
update_report(bucket_reports, total.bucket.name, total.obj_count, total.total_size)
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
# Bucket reports can be updated as we get them
if options.dry_run:
for bucket, report in bucket_reports.items():
_log.info(
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % (
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
stale_buckets = recorded_buckets.difference(observed_buckets)
_log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags')
else:
# Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run:
for userid, report in account_reports.items():
_log.info(
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % (
userid, report['obj_count'], report['total_size']
)
)
else:
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
# Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
if options.account:
stale_accounts = { a for a in options.account if a not in observed_accounts }
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
_log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run:
_log.info("DryRun: not updating stale accounts")
else:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for account in chunk:
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()

View File

@ -32,7 +32,6 @@ const keys = {
deleteObject: prefix => `${prefix}DeleteObject`, deleteObject: prefix => `${prefix}DeleteObject`,
multiObjectDelete: prefix => `${prefix}MultiObjectDelete`, multiObjectDelete: prefix => `${prefix}MultiObjectDelete`,
uploadPart: prefix => `${prefix}UploadPart`, uploadPart: prefix => `${prefix}UploadPart`,
uploadPartCopy: prefix => `${prefix}UploadPartCopy`,
getObject: prefix => `${prefix}GetObject`, getObject: prefix => `${prefix}GetObject`,
getObjectAcl: prefix => `${prefix}GetObjectAcl`, getObjectAcl: prefix => `${prefix}GetObjectAcl`,
getObjectTagging: prefix => `${prefix}GetObjectTagging`, getObjectTagging: prefix => `${prefix}GetObjectTagging`,
@ -46,15 +45,6 @@ const keys = {
putBucketReplication: prefix => `${prefix}PutBucketReplication`, putBucketReplication: prefix => `${prefix}PutBucketReplication`,
getBucketReplication: prefix => `${prefix}GetBucketReplication`, getBucketReplication: prefix => `${prefix}GetBucketReplication`,
deleteBucketReplication: prefix => `${prefix}DeleteBucketReplication`, deleteBucketReplication: prefix => `${prefix}DeleteBucketReplication`,
putBucketObjectLock: prefix => `${prefix}PutBucketObjectLock`,
getBucketObjectLock: prefix => `${prefix}GetBucketObjectLock`,
putObjectRetention: prefix => `${prefix}PutObjectRetention`,
getObjectRetention: prefix => `${prefix}GetObjectRetention`,
putObjectLegalHold: prefix => `${prefix}PutObjectLegalHold`,
getObjectLegalHold: prefix => `${prefix}GetObjectLegalHold`,
replicateObject: prefix => `${prefix}ReplicateObject`,
replicateTags: prefix => `${prefix}ReplicateTags`,
replicateDelete: prefix => `${prefix}ReplicateDelete`,
incomingBytes: prefix => `${prefix}incomingBytes`, incomingBytes: prefix => `${prefix}incomingBytes`,
outgoingBytes: prefix => `${prefix}outgoingBytes`, outgoingBytes: prefix => `${prefix}outgoingBytes`,
}; };
@ -67,13 +57,11 @@ const keys = {
* @return {string} - prefix for the schema key * @return {string} - prefix for the schema key
*/ */
function getSchemaPrefix(params, timestamp) { function getSchemaPrefix(params, timestamp) {
const { const { bucket, accountId, userId, level, service, location } = params;
bucket, accountId, userId, level, service, location,
} = params;
// `service` property must remain last because other objects also include it // `service` property must remain last because other objects also include it
const id = bucket || accountId || userId || location || service; const id = bucket || accountId || userId || location || service;
const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:` const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:` :
: `${service}:${level}:${id}:`; `${service}:${level}:${id}:`;
return prefix; return prefix;
} }

View File

@ -1,4 +1,3 @@
/* eslint-disable class-methods-use-this */
const http = require('http'); const http = require('http');
const https = require('https'); const https = require('https');
const url = require('url'); const url = require('url');
@ -7,12 +6,13 @@ const { Clustering, errors, ipCheck } = require('arsenal');
const arsenalHttps = require('arsenal').https; const arsenalHttps = require('arsenal').https;
const { Logger } = require('werelogs'); const { Logger } = require('werelogs');
const config = require('./Config');
const routes = require('../router/routes'); const routes = require('../router/routes');
const Route = require('../router/Route'); const Route = require('../router/Route');
const Router = require('../router/Router'); const Router = require('../router/Router');
const UtapiRequest = require('../lib/UtapiRequest'); const UtapiRequest = require('../lib/UtapiRequest');
const Datastore = require('./Datastore'); const Datastore = require('./Datastore');
const redisClientv2 = require('../utils/redisClientv2'); const redisClient = require('../utils/redisClient');
class UtapiServer { class UtapiServer {
/** /**
@ -27,12 +27,7 @@ class UtapiServer {
constructor(worker, port, datastore, logger, config) { constructor(worker, port, datastore, logger, config) {
this.worker = worker; this.worker = worker;
this.port = port; this.port = port;
this.vault = config.vaultclient; this.router = new Router(config);
if (!this.vault) {
const Vault = require('./Vault');
this.vault = new Vault(config);
}
this.router = new Router(config, this.vault);
this.logger = logger; this.logger = logger;
this.datastore = datastore; this.datastore = datastore;
this.server = null; this.server = null;
@ -75,7 +70,6 @@ class UtapiServer {
req.socket.setNoDelay(); req.socket.setNoDelay();
const { query, path, pathname } = url.parse(req.url, true); const { query, path, pathname } = url.parse(req.url, true);
const utapiRequest = new UtapiRequest() const utapiRequest = new UtapiRequest()
.setVault(this.vault)
.setRequest(req) .setRequest(req)
.setLog(this.logger.newRequestLogger()) .setLog(this.logger.newRequestLogger())
.setResponse(res) .setResponse(res)
@ -92,17 +86,15 @@ class UtapiServer {
|| req.method === 'POST')) { || req.method === 'POST')) {
utapiRequest.setStatusCode(200); utapiRequest.setStatusCode(200);
const allowIp = ipCheck.ipMatchCidrList( const allowIp = ipCheck.ipMatchCidrList(
config.healthChecks.allowFrom, req.socket.remoteAddress, config.healthChecks.allowFrom, req.socket.remoteAddress);
);
if (!allowIp) { if (!allowIp) {
return this.errorResponse(utapiRequest, errors.AccessDenied); return this.errorResponse(utapiRequest, errors.AccessDenied);
} }
const redisClient = this.datastore.getClient(); const redisClient = this.datastore.getClient();
if (!redisClient.isReady) { if (redisClient.status !== 'ready') {
return this.errorResponse(utapiRequest, return this.errorResponse(utapiRequest,
errors.InternalError.customizeDescription( errors.InternalError.customizeDescription(
'Redis server is not ready', 'Redis server is not ready'));
));
} }
return this.response(utapiRequest, {}); return this.response(utapiRequest, {});
} }
@ -129,7 +121,8 @@ class UtapiServer {
rejectUnauthorized: true, rejectUnauthorized: true,
}, (req, res) => this.requestListener(req, res, this.router)); }, (req, res) => this.requestListener(req, res, this.router));
} else { } else {
this.server = http.createServer((req, res) => this.requestListener(req, res, this.router)); this.server = http.createServer((req, res) =>
this.requestListener(req, res, this.router));
} }
this.server.on('listening', () => { this.server.on('listening', () => {
const addr = this.server.address() || { const addr = this.server.address() || {
@ -219,18 +212,15 @@ class UtapiServer {
* @property {object} params.log - logger configuration * @property {object} params.log - logger configuration
* @return {undefined} * @return {undefined}
*/ */
function spawn(config) { function spawn(params) {
const { Object.assign(config, params);
workers, redis, log, port, const { workers, redis, log, port } = config;
} = config;
const logger = new Logger('Utapi', { const logger = new Logger('Utapi', { level: log.logLevel,
level: log.logLevel, dump: log.dumpLevel });
dump: log.dumpLevel,
});
const cluster = new Clustering(workers, logger); const cluster = new Clustering(workers, logger);
cluster.start(worker => { cluster.start(worker => {
const datastore = new Datastore().setClient(redisClientv2(redis, logger)); const datastore = new Datastore().setClient(redisClient(redis, logger));
const server = new UtapiServer(worker, port, datastore, logger, config); const server = new UtapiServer(worker, port, datastore, logger, config);
server.startup(); server.startup();
}); });

View File

@ -1,7 +0,0 @@
const MemoryCache = require('./memory');
const RedisCache = require('./redis');
module.exports = {
MemoryCache,
RedisCache,
};

View File

@ -1,110 +0,0 @@
const schema = require('../schema');
const constants = require('../../constants');
/**
* Returns null iff the value is undefined.
* Returns the passed value otherwise.
*
* @param {*} value - Any value
* @returns {*} - Passed value or null
*/
function orNull(value) {
return value === undefined ? null : value;
}
class MemoryCache {
constructor() {
this._data = {};
this._shards = {};
this._prefix = 'utapi';
this._expirations = {};
}
// eslint-disable-next-line class-methods-use-this
async connect() {
return true;
}
// eslint-disable-next-line class-methods-use-this
async disconnect() {
Object.values(this._expirations).forEach(clearTimeout);
return true;
}
_expireKey(key, delay) {
if (this._expirations[key]) {
clearTimeout(this._expirations[key]);
}
this._expirations[key] = setTimeout(() => delete this._data[key], delay * 1000);
}
async getKey(key) {
return this._data[key];
}
async setKey(key, data) {
this._data[key] = data;
return true;
}
async addToShard(shard, event) {
const metricKey = schema.getUtapiMetricKey(this._prefix, event);
this._data[metricKey] = event;
if (this._shards[shard]) {
this._shards[shard].push(metricKey);
} else {
this._shards[shard] = [metricKey];
}
return true;
}
async getKeysInShard(shard) {
return this._shards[shard] || [];
}
async fetchShard(shard) {
if (this._shards[shard]) {
return this._shards[shard].map(key => this._data[key]);
}
return [];
}
async deleteShardAndKeys(shard) {
(this._shards[shard] || []).forEach(key => {
delete this._data[key];
});
delete this._shards[shard];
return true;
}
async getShards() {
return Object.keys(this._shards);
}
async shardExists(shard) {
return this._shards[shard.toString()] !== undefined;
}
async updateCounters(metric) {
if (metric.sizeDelta) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, metric.account);
this._data[accountSizeKey] = (this._data[accountSizeKey] || 0) + metric.sizeDelta;
}
}
async updateAccountCounterBase(account, size) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
this._data[accountSizeKey] = 0;
this._data[accountSizeBaseKey] = size;
this._expireKey(accountSizeBaseKey, constants.counterBaseValueExpiration);
}
async fetchAccountSizeCounter(account) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
return [orNull(this._data[accountSizeKey]), orNull(this._data[accountSizeBaseKey])];
}
}
module.exports = MemoryCache;

View File

@ -1,194 +0,0 @@
const RedisClient = require('../../redis');
const schema = require('../schema');
const { LoggerContext } = require('../../utils');
const constants = require('../../constants');
const moduleLogger = new LoggerContext({
module: 'cache.backend.redis.RedisCache',
});
class RedisCache {
constructor(options, prefix) {
this._redis = null;
this._options = options;
this._prefix = prefix || 'utapi';
}
async connect() {
moduleLogger.debug('Connecting to redis...');
this._redis = new RedisClient(this._options);
this._redis.connect();
return true;
}
async disconnect() {
const logger = moduleLogger.with({ method: 'disconnect' });
if (this._redis) {
try {
logger.debug('closing connection to redis');
await this._redis.disconnect();
} catch (error) {
logger.error('error while closing connection to redis', {
error,
});
throw error;
}
this._redis = null;
} else {
logger.debug('disconnect called but no connection to redis found');
}
}
async getKey(key) {
return moduleLogger
.with({ method: 'getKey' })
.logAsyncError(() => this._redis.call(redis => redis.get(key)),
'error fetching key from redis', { key });
}
async setKey(key, value) {
return moduleLogger
.with({ method: 'setKey' })
.logAsyncError(async () => {
const res = await this._redis.call(redis => redis.set(key, value));
return res === 'OK';
}, 'error setting key in redis', { key });
}
async addToShard(shard, metric) {
const logger = moduleLogger.with({ method: 'addToShard' });
return logger
.logAsyncError(async () => {
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
logger.debug('adding metric to shard', { metricKey, shardKey });
const [setResults, saddResults] = await this._redis
.call(redis => redis
.multi([
['set', metricKey, JSON.stringify(metric.getValue())],
['sadd', shardKey, metricKey],
['sadd', shardMasterKey, shardKey],
])
.exec());
let success = true;
if (setResults[1] !== 'OK') {
moduleLogger.error('failed to set metric key', {
metricKey,
shardKey,
res: setResults[1],
});
success = false;
}
if (saddResults[1] !== 1) {
moduleLogger.error('metric key already present in shard', {
metricKey,
shardKey,
res: saddResults[1],
});
success = false;
}
return success;
}, 'error during redis command');
}
async getKeysInShard(shard) {
return moduleLogger
.with({ method: 'getKeysInShard' })
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
return this._redis.call(redis => redis.smembers(shardKey));
}, 'error while fetching shard keys', { shard });
}
async fetchShard(shard) {
return moduleLogger
.with({ method: 'fetchShard' })
.logAsyncError(async () => {
const keys = await this.getKeysInShard(shard);
if (!keys.length) {
return [];
}
return this._redis.call(redis => redis.mget(...keys));
}, 'error while fetching shard data', { shard });
}
async deleteShardAndKeys(shard) {
return moduleLogger
.with({ method: 'deleteShardAndKeys' })
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
const keys = await this.getKeysInShard(shard);
return this._redis.call(
redis => redis.multi([
['del', shardKey, ...keys],
['srem', shardMasterKey, shardKey],
]).exec(),
);
}, 'error while deleting shard', { shard });
}
async shardExists(shard) {
return moduleLogger
.with({ method: 'shardExists' })
.logAsyncError(async () => {
const shardKey = schema.getShardKey(this._prefix, shard);
const res = await this._redis.call(redis => redis.exists(shardKey));
return res === 1;
}, 'error while checking shard', { shard });
}
async getShards() {
return moduleLogger
.with({ method: 'getShards' })
.logAsyncError(async () => {
const shardMasterKey = schema.getShardMasterKey(this._prefix);
return this._redis.call(redis => redis.smembers(shardMasterKey));
}, 'error while fetching shards');
}
async updateCounters(metric) {
return moduleLogger
.with({ method: 'updateCounter' })
.logAsyncError(async () => {
if (metric.sizeDelta) {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, metric.account);
await this._redis.call(redis => redis.incrby(accountSizeKey, metric.sizeDelta));
}
}, 'error while updating metric counters');
}
async updateAccountCounterBase(account, size) {
return moduleLogger
.with({ method: 'updateAccountCounterBase' })
.logAsyncError(async () => {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
await this._redis.call(async redis => {
await redis.mset(accountSizeKey, 0, accountSizeBaseKey, size);
await redis.expire(accountSizeBaseKey, constants.counterBaseValueExpiration);
});
}, 'error while updating metric counter base');
}
async fetchAccountSizeCounter(account) {
return moduleLogger
.with({ method: 'fetchAccountSizeCounter' })
.logAsyncError(async () => {
const accountSizeKey = schema.getAccountSizeCounterKey(this._prefix, account);
const accountSizeBaseKey = schema.getAccountSizeCounterBaseKey(this._prefix, account);
const [counter, base] = await this._redis.call(redis => redis.mget(accountSizeKey, accountSizeBaseKey));
return [
counter !== null ? parseInt(counter, 10) : null,
base !== null ? parseInt(base, 10) : null,
];
}, 'error fetching account size counters', { account });
}
}
module.exports = RedisCache;

58
libV2/cache/client.js vendored
View File

@ -1,58 +0,0 @@
const { shardFromTimestamp } = require('../utils');
class CacheClient {
constructor(config) {
this._prefix = config.prefix || 'utapi';
this._cacheBackend = config.cacheBackend;
this._counterBackend = config.counterBackend;
}
async connect() {
return Promise.all([
this._cacheBackend.connect(),
this._counterBackend.connect(),
]);
}
async disconnect() {
return Promise.all([
this._cacheBackend.disconnect(),
this._counterBackend.disconnect(),
]);
}
async pushMetric(metric) {
const shard = shardFromTimestamp(metric.timestamp);
if (!(await this._cacheBackend.addToShard(shard, metric))) {
return false;
}
await this._counterBackend.updateCounters(metric);
return true;
}
async getMetricsForShard(shard) {
return this._cacheBackend.fetchShard(shard);
}
async deleteShard(shard) {
return this._cacheBackend.deleteShardAndKeys(shard);
}
async shardExists(shard) {
return this._cacheBackend.shardExists(shard);
}
async getShards() {
return this._cacheBackend.getShards();
}
async updateAccountCounterBase(account, size) {
return this._counterBackend.updateAccountCounterBase(account, size);
}
async fetchAccountSizeCounter(account) {
return this._counterBackend.fetchAccountSizeCounter(account);
}
}
module.exports = CacheClient;

21
libV2/cache/index.js vendored
View File

@ -1,21 +0,0 @@
const config = require('../config');
const CacheClient = require('./client');
const { MemoryCache, RedisCache } = require('./backend');
const cacheTypes = {
redis: conf => new RedisCache(conf),
memory: () => new MemoryCache(),
};
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
const counterBackend = cacheTypes[config.cache.backend](config.redis);
module.exports = {
CacheClient,
backends: {
MemoryCache,
RedisCache,
},
client: new CacheClient({ cacheBackend, counterBackend }),
};

27
libV2/cache/schema.js vendored
View File

@ -1,27 +0,0 @@
function getShardKey(prefix, shard) {
return `${prefix}:shard:${shard}`;
}
function getUtapiMetricKey(prefix, metric) {
return `${prefix}:events:${metric.uuid}`;
}
function getShardMasterKey(prefix) {
return `${prefix}:shard:master`;
}
function getAccountSizeCounterKey(prefix, account) {
return `${prefix}:counters:account:${account}:size`;
}
function getAccountSizeCounterBaseKey(prefix, account) {
return `${prefix}:counters:account:${account}:size:base`;
}
module.exports = {
getShardKey,
getUtapiMetricKey,
getShardMasterKey,
getAccountSizeCounterKey,
getAccountSizeCounterBaseKey,
};

View File

@ -1,329 +0,0 @@
const { callbackify } = require('util');
const { Transform } = require('stream');
const uuid = require('uuid');
const needle = require('needle');
// These modules are added via the `level-mem` package rather than individually
/* eslint-disable import/no-extraneous-dependencies */
const levelup = require('levelup');
const memdown = require('memdown');
const encode = require('encoding-down');
/* eslint-enable import/no-extraneous-dependencies */
const { UtapiMetric } = require('../models');
const {
LoggerContext,
logEventFilter,
asyncOrCallback,
buildFilterChain,
} = require('../utils');
const moduleLogger = new LoggerContext({
module: 'client',
});
class Chunker extends Transform {
constructor(options) {
super({ objectMode: true, ...options });
this._chunkSize = (options && options.chunkSize) || 100;
this._currentChunk = [];
}
_transform(chunk, encoding, callback) {
this._currentChunk.push(chunk);
if (this._currentChunk.length >= this._chunkSize) {
this.push(this._currentChunk);
this._currentChunk = [];
}
callback();
}
_flush(callback) {
if (this._currentChunk.length) {
this.push(this._currentChunk);
}
callback();
}
}
class Uploader extends Transform {
constructor(options) {
super({ objectMode: true, ...options });
this._ingest = options.ingest;
}
_transform(chunk, encoding, callback) {
this._ingest(chunk.map(i => new UtapiMetric(i.value)))
.then(() => {
this.push({
success: true,
keys: chunk.map(i => i.key),
});
callback();
},
error => {
this.push({
success: false,
keys: [],
});
moduleLogger.error('error uploading metrics from retry cache', { error });
callback();
});
}
}
class UtapiClient {
constructor(config) {
this._host = (config && config.host) || 'localhost';
this._port = (config && config.port) || '8100';
this._tls = (config && config.tls) || {};
this._transport = (config && config.tls) ? 'https' : 'http';
this._logger = (config && config.logger) || moduleLogger;
this._maxCachedMetrics = (config && config.maxCachedMetrics) || 200000; // roughly 100MB
this._numCachedMetrics = 0;
this._disableRetryCache = config && config.disableRetryCache;
this._retryCache = this._disableRetryCache
? null
: levelup(encode(memdown(), { valueEncoding: 'json' }));
this._drainTimer = null;
this._drainCanSchedule = true;
this._drainDelay = (config && config.drainDelay) || 30000;
this._suppressedEventFields = (config && config.suppressedEventFields) || null;
const eventFilters = (config && config.filter) || {};
this._shouldPushMetric = buildFilterChain(eventFilters);
if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => moduleLogger.info(...args), 'utapi event filter enabled', eventFilters);
}
}
async join() {
await this._flushRetryCacheToLogs();
this._retryCache.close();
}
async _pushToUtapi(metrics) {
const resp = await needle(
'post',
`${this._transport}://${this._host}:${this._port}/v2/ingest`,
metrics.map(metric => metric.getValue()),
{ json: true, ...this._tls },
);
if (resp.statusCode !== 200) {
throw Error('failed to push metric, server returned non 200 status code',
{ respCode: resp.statusCode, respMessage: resp.statusMessage });
}
}
async _addToRetryCache(metric) {
if (this._numCachedMetrics < this._maxCachedMetrics) {
try {
await this._retryCache.put(metric.uuid, metric.getValue());
this._numCachedMetrics += 1;
await this._scheduleDrain();
return true;
} catch (error) {
this._logger
.error('error adding metric to retry cache', { error });
this._emitMetricLogLine(metric, { reason: 'error' });
}
} else {
this._emitMetricLogLine(metric, { reason: 'overflow' });
}
return false;
}
async _drainRetryCache() {
return new Promise((resolve, reject) => {
let empty = true;
const toRemove = [];
this._retryCache.createReadStream()
.pipe(new Chunker())
.pipe(new Uploader({ ingest: this._pushToUtapi.bind(this) }))
.on('data', res => {
if (res.success) {
toRemove.push(...res.keys);
} else {
empty = false;
}
})
.on('end', () => {
this._retryCache.batch(
toRemove.map(key => ({ type: 'del', key })),
error => {
if (error) {
this._logger.error('error removing events from retry cache', { error });
reject(error);
return;
}
resolve(empty);
},
);
})
.on('error', reject);
});
}
async _drainRetryCachePreflight() {
try {
const resp = await needle(
'get',
`${this._transport}://${this._host}:${this._port}/_/healthcheck`,
this._tls,
);
return resp.statusCode === 200;
} catch (error) {
this._logger
.debug('drain preflight request failed', { error });
return false;
}
}
async _attemptDrain() {
if (await this._drainRetryCachePreflight()) {
let empty = false;
try {
empty = await this._drainRetryCache();
} catch (error) {
this._logger
.error('Error while draining cache', { error });
}
if (!empty) {
await this._scheduleDrain();
}
}
this._drainTimer = null;
}
async _scheduleDrain() {
if (this._drainCanSchedule && !this._drainTimer) {
this._drainTimer = setTimeout(this._attemptDrain.bind(this), this._drainDelay);
}
}
async _disableDrain() {
this._drainCanSchedule = false;
if (this._drainTimer) {
clearTimeout(this._drainTimer);
this._drainTimer = null;
}
}
_emitMetricLogLine(metric, extra) {
this._logger.info('utapi metric recovery log', {
event: metric.getValue(),
utapiRecovery: true,
...(extra || {}),
});
}
async _flushRetryCacheToLogs() {
const toRemove = [];
return new Promise((resolve, reject) => {
this._retryCache.createReadStream()
.on('data', entry => {
this._emitMetricLogLine(entry.value);
toRemove.push(entry.key);
})
.on('end', () => {
this._retryCache.batch(
toRemove.map(key => ({ type: 'del', key })),
error => {
if (error) {
this._logger.error('error removing events from retry cache', { error });
reject(error);
return;
}
resolve();
},
);
})
.on('error', err => reject(err));
});
}
async _pushMetric(data) {
let metric = data instanceof UtapiMetric
? data
: new UtapiMetric(data);
// If this event has been filtered then exit early
if (!this._shouldPushMetric(metric)) {
return;
}
// Assign a uuid if one isn't passed
if (!metric.uuid) {
metric.uuid = uuid.v4();
}
// Assign a timestamp if one isn't passed
if (!metric.timestamp) {
metric.timestamp = new Date().getTime();
}
if (this._suppressedEventFields !== null) {
const filteredData = Object.entries(metric.getValue())
.filter(([key]) => !this._suppressedEventFields.includes(key))
.reduce((obj, [key, value]) => {
obj[key] = value;
return obj;
}, {});
metric = new UtapiMetric(filteredData);
}
try {
await this._pushToUtapi([metric]);
} catch (error) {
if (!this._disableRetryCache) {
this._logger.error('unable to push metric, adding to retry cache', { error });
if (!await this._addToRetryCache(metric)) {
throw new Error('unable to store metric');
}
} else {
this._logger.debug('unable to push metric. retry cache disabled, not retrying ingestion.', { error });
}
}
}
pushMetric(data, cb) {
if (typeof cb === 'function') {
callbackify(this._pushMetric.bind(this))(data, cb);
return undefined;
}
return this._pushMetric(data);
}
/**
* Get the storageUtilized of a resource
*
* @param {string} level - level of metrics, currently only 'accounts' is supported
* @param {string} resource - id of the resource
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - return a Promise if no callback is provided, undefined otherwise
*/
getStorage(level, resource, callback) {
if (level !== 'accounts') {
throw new Error('invalid level, only "accounts" is supported');
}
return asyncOrCallback(async () => {
const resp = await needle(
'get',
`${this._transport}://${this._host}:${this._port}/v2/storage/${level}/${resource}`,
this._tls,
);
if (resp.statusCode !== 200) {
throw new Error(`unable to retrieve metrics: ${resp.statusMessage}`);
}
return resp.body;
}, callback);
}
}
module.exports = UtapiClient;

View File

@ -1,64 +0,0 @@
{
"host": "127.0.0.1",
"port": 8100,
"log": {
"logLevel": "info",
"dumpLevel": "error"
},
"redis": {
"host": "127.0.0.1",
"port": 6379
},
"localCache": {
"host": "127.0.0.1",
"port": 6379
},
"warp10": {
"host": "127.0.0.1",
"port": 4802,
"nodeId": "single_node",
"requestTimeout": 60000,
"connectTimeout": 60000
},
"healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"]
},
"cacheBackend": "memory",
"development": false,
"nodeId": "single_node",
"ingestionSchedule": "*/5 * * * * *",
"ingestionShardSize": 10,
"ingestionLagSeconds": 30,
"checkpointSchedule": "*/30 * * * * *",
"snapshotSchedule": "5 0 * * * *",
"repairSchedule": "0 */5 * * * *",
"reindexSchedule": "0 0 0 * * Sun",
"diskUsageSchedule": "0 */15 * * * *",
"bucketd": [ "localhost:9000" ],
"reindex": {
"enabled": true,
"schedule": "0 0 0 * * 6"
},
"diskUsage": {
"retentionDays": 45,
"expirationEnabled": false
},
"serviceUser": {
"arn": "arn:aws:iam::000000000000:user/scality-internal/service-utapi-user",
"enabled": false
},
"filter": {
"allow": {},
"deny": {}
},
"metrics" : {
"enabled": false,
"host": "localhost",
"ingestPort": 10902,
"checkpointPort": 10903,
"snapshotPort": 10904,
"diskUsagePort": 10905,
"reindexPort": 10906,
"repairPort": 10907
}
}

View File

@ -1,414 +0,0 @@
const fs = require('fs');
const path = require('path');
const Joi = require('@hapi/joi');
const assert = require('assert');
const defaults = require('./defaults.json');
const werelogs = require('werelogs');
const {
truthy, envNamespace, allowedFilterFields, allowedFilterStates,
} = require('../constants');
const configSchema = require('./schema');
// We need to require the specific file rather than the parent module to avoid a circular require
const { parseDiskSizeSpec } = require('../utils/disk');
function _splitTrim(char, text) {
return text.split(char).map(v => v.trim());
}
function _splitServer(text) {
assert.notStrictEqual(text.indexOf(':'), -1);
const [host, port] = _splitTrim(':', text);
return {
host,
port: Number.parseInt(port, 10),
};
}
function _splitNode(text) {
assert.notStrictEqual(text.indexOf('='), -1);
const [nodeId, hostname] = _splitTrim('=', text);
return {
nodeId,
..._splitServer(hostname),
};
}
const _typeCasts = {
bool: val => truthy.has(val.toLowerCase()),
int: val => parseInt(val, 10),
list: val => _splitTrim(',', val),
serverList: val => _splitTrim(',', val).map(_splitServer),
nodeList: val => _splitTrim(',', val).map(_splitNode),
diskSize: parseDiskSizeSpec,
};
function _definedInEnv(key) {
return process.env[`${envNamespace}_${key}`] !== undefined;
}
function _loadFromEnv(key, defaultValue, type) {
const envKey = `${envNamespace}_${key}`;
const value = process.env[envKey];
if (value !== undefined) {
if (type !== undefined) {
return type(value);
}
return value;
}
return defaultValue;
}
const defaultConfigPath = path.join(__dirname, '../../config.json');
class Config {
/**
* Returns a new Config instance merging the loaded config with the provided values.
* Passed values override loaded ones recursively.
*
* @param {object} overrides - an object using the same structure as the config file
* @returns {Config} - New Config instance
*/
constructor(overrides) {
this._basePath = path.join(__dirname, '../../');
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
this.host = undefined;
this.port = undefined;
this.healthChecks = undefined;
this.logging = { level: 'debug', dumpLevel: 'error' };
this.redis = undefined;
this.warp10 = undefined;
// read config automatically
const loadedConfig = this._loadConfig();
let parsedConfig = this._parseConfig(loadedConfig);
if (typeof overrides === 'object') {
parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
}
Object.assign(this, parsedConfig);
werelogs.configure({
level: Config.logging.level,
dump: Config.logging.dumpLevel,
});
}
static _readFile(path, encoding = 'utf-8') {
try {
return fs.readFileSync(path, { encoding });
} catch (error) {
// eslint-disable-next-line no-console
console.error({ message: `error reading file at ${path}`, error });
throw error;
}
}
static _readJSON(path) {
const data = Config._readFile(path);
try {
return JSON.parse(data);
} catch (error) {
// eslint-disable-next-line no-console
console.error({ message: `error parsing JSON from file at ${path}`, error });
throw error;
}
}
_loadDefaults() {
return defaults;
}
_loadUserConfig() {
return Joi.attempt(
Config._readJSON(this._configPath),
configSchema,
'invalid Utapi config',
);
}
_recursiveUpdateArray(parent, child) {
const ret = [];
for (let i = 0; i < Math.max(parent.length, child.length); i += 1) {
ret[i] = this._recursiveUpdate(parent[i], child[i]);
}
return ret;
}
_recursiveUpdateObject(parent, child) {
return Array.from(
new Set([
...Object.keys(parent),
...Object.keys(child)],
// eslint-disable-next-line function-paren-newline
))
.reduce((ret, key) => {
// eslint-disable-next-line no-param-reassign
ret[key] = this._recursiveUpdate(parent[key], child[key]);
return ret;
}, {});
}
/**
* Given two nested Object/Array combos, walk each and return a new object
* with values from child overwriting parent.
* @param {*} parent - Initial value
* @param {*} child - New value
* @returns {*} - Merged value
*/
_recursiveUpdate(parent, child) {
// If no parent value use the child
if (parent === undefined) {
return child;
}
// If no child value use the parent
if (child === undefined) {
return parent;
}
if (Array.isArray(parent) && Array.isArray(child)) {
return this._recursiveUpdateArray(parent, child);
}
if (typeof parent === 'object' && typeof child === 'object') {
return this._recursiveUpdateObject(parent, child);
}
return child;
}
_loadConfig() {
const defaultConf = this._loadDefaults();
const userConf = this._loadUserConfig();
return this._recursiveUpdateObject(defaultConf, userConf);
}
static _parseRedisConfig(prefix, config) {
const redisConf = {
retry: config.retry,
};
if (config.sentinels || _definedInEnv(`${prefix}_SENTINELS`)) {
redisConf.name = _loadFromEnv(`${prefix}_NAME`, config.name);
redisConf.sentinels = _loadFromEnv(
`${prefix}_SENTINELS`,
config.sentinels,
_typeCasts.serverList,
);
redisConf.sentinelPassword = _loadFromEnv(
`${prefix}_SENTINEL_PASSWORD`,
config.sentinelPassword,
);
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
config.password,
);
} else {
redisConf.host = _loadFromEnv(
`${prefix}_HOST`,
config.host,
);
redisConf.port = _loadFromEnv(
`${prefix}_PORT`,
config.port,
_typeCasts.int,
);
redisConf.password = _loadFromEnv(
`${prefix}_PASSWORD`,
config.password,
);
}
return redisConf;
}
_loadCertificates(config) {
const { key, cert, ca } = config;
const keyPath = path.isAbsolute(key) ? key : path.join(this._basePath, key);
const certPath = path.isAbsolute(cert) ? cert : path.join(this._basePath, cert);
const certs = {
cert: Config._readFile(certPath, 'ascii'),
key: Config._readFile(keyPath, 'ascii'),
};
if (ca) {
const caPath = path.isAbsolute(ca) ? ca : path.join(this._basePath, ca);
certs.ca = Config._readFile(caPath, 'ascii');
}
return certs;
}
static _parseResourceFilters(config) {
const resourceFilters = {};
allowedFilterFields.forEach(
field => allowedFilterStates.forEach(
state => {
const configResources = (config[state] && config[state][field]) || null;
const envVar = `FILTER_${field.toUpperCase()}_${state.toUpperCase()}`;
const resources = _loadFromEnv(envVar, configResources, _typeCasts.list);
if (resources) {
if (resourceFilters[field]) {
throw new Error('You can not define both an allow and a deny list for an event field.');
}
resourceFilters[field] = { [state]: new Set(resources) };
}
},
),
);
return resourceFilters;
}
_parseConfig(config) {
const parsedConfig = {};
parsedConfig.development = _loadFromEnv('DEV_MODE', config.development, _typeCasts.bool);
parsedConfig.nodeId = _loadFromEnv('NODE_ID', config.nodeId);
parsedConfig.host = _loadFromEnv('HOST', config.host);
parsedConfig.port = _loadFromEnv('PORT', config.port, _typeCasts.int);
const healthCheckFromEnv = _loadFromEnv(
'ALLOW_HEALTHCHECK',
[],
_typeCasts.list,
);
parsedConfig.healthChecks = {
allowFrom: healthCheckFromEnv.concat(config.healthChecks.allowFrom),
};
const certPaths = {
cert: _loadFromEnv('TLS_CERT', config.certFilePaths.cert),
key: _loadFromEnv('TLS_KEY', config.certFilePaths.key),
ca: _loadFromEnv('TLS_CA', config.certFilePaths.ca),
};
if (certPaths.key && certPaths.cert) {
parsedConfig.tls = this._loadCertificates(certPaths);
} else if (certPaths.key || certPaths.cert) {
throw new Error('bad config: both certFilePaths.key and certFilePaths.cert must be defined');
}
parsedConfig.redis = Config._parseRedisConfig('REDIS', config.redis);
parsedConfig.cache = Config._parseRedisConfig('REDIS_CACHE', config.localCache);
parsedConfig.cache.backend = _loadFromEnv('CACHE_BACKEND', config.cacheBackend);
const warp10Conf = {
readToken: _loadFromEnv('WARP10_READ_TOKEN', config.warp10.readToken),
writeToken: _loadFromEnv('WARP10_WRITE_TOKEN', config.warp10.writeToken),
requestTimeout: _loadFromEnv('WARP10_REQUEST_TIMEOUT', config.warp10.requestTimeout, _typeCasts.int),
connectTimeout: _loadFromEnv('WARP10_CONNECT_TIMEOUT', config.warp10.connectTimeout, _typeCasts.int),
};
if (Array.isArray(config.warp10.hosts) || _definedInEnv('WARP10_HOSTS')) {
warp10Conf.hosts = _loadFromEnv('WARP10_HOSTS', config.warp10.hosts, _typeCasts.nodeList);
} else {
warp10Conf.hosts = [{
host: _loadFromEnv('WARP10_HOST', config.warp10.host),
port: _loadFromEnv('WARP10_PORT', config.warp10.port, _typeCasts.int),
nodeId: _loadFromEnv('WARP10_NODE_ID', config.warp10.nodeId),
}];
}
parsedConfig.warp10 = warp10Conf;
parsedConfig.logging = {
level: parsedConfig.development
? 'debug'
: _loadFromEnv('LOG_LEVEL', config.log.logLevel),
dumpLevel: _loadFromEnv(
'LOG_DUMP_LEVEL',
config.log.dumpLevel,
),
};
parsedConfig.ingestionSchedule = _loadFromEnv('INGESTION_SCHEDULE', config.ingestionSchedule);
parsedConfig.checkpointSchedule = _loadFromEnv('CHECKPOINT_SCHEDULE', config.checkpointSchedule);
parsedConfig.snapshotSchedule = _loadFromEnv('SNAPSHOT_SCHEDULE', config.snapshotSchedule);
parsedConfig.repairSchedule = _loadFromEnv('REPAIR_SCHEDULE', config.repairSchedule);
parsedConfig.reindexSchedule = _loadFromEnv('REINDEX_SCHEDULE', config.reindexSchedule);
parsedConfig.diskUsageSchedule = _loadFromEnv('DISK_USAGE_SCHEDULE', config.diskUsageSchedule);
parsedConfig.ingestionLagSeconds = _loadFromEnv(
'INGESTION_LAG_SECONDS',
config.ingestionLagSeconds,
_typeCasts.int,
);
parsedConfig.ingestionShardSize = _loadFromEnv(
'INGESTION_SHARD_SIZE',
config.ingestionShardSize,
_typeCasts.int,
);
const diskUsage = {
path: _loadFromEnv('DISK_USAGE_PATH', (config.diskUsage || {}).path),
hardLimit: _loadFromEnv('DISK_USAGE_HARD_LIMIT', (config.diskUsage || {}).hardLimit),
retentionDays: _loadFromEnv(
'METRIC_RETENTION_PERIOD',
(config.diskUsage || {}).retentionDays, _typeCasts.int,
),
expirationEnabled: _loadFromEnv(
'METRIC_EXPIRATION_ENABLED',
(config.diskUsage || {}).expirationEnabled, _typeCasts.bool,
),
};
if (diskUsage.hardLimit !== undefined) {
diskUsage.hardLimit = parseDiskSizeSpec(diskUsage.hardLimit);
}
if (!diskUsage.path && diskUsage.hardLimit !== undefined) {
throw Error('You must specify diskUsage.path to monitor for disk usage');
} else if (diskUsage.path && diskUsage.hardLimit === undefined) {
throw Error('diskUsage.hardLimit must be specified');
} else if (diskUsage.expirationEnabled && diskUsage.retentionDays === undefined) {
throw Error('diskUsage.retentionDays must be specified');
}
diskUsage.enabled = diskUsage.path !== undefined;
parsedConfig.diskUsage = diskUsage;
parsedConfig.vaultd = {
host: _loadFromEnv('VAULT_HOST', config.vaultd.host),
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
};
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
parsedConfig.serviceUser = {
arn: _loadFromEnv('SERVICE_USER_ARN', config.serviceUser.arn),
enabled: _loadFromEnv('SERVICE_USER_ENABLED', config.serviceUser.enabled, _typeCasts.bool),
};
parsedConfig.filter = Config._parseResourceFilters(config.filter);
parsedConfig.metrics = {
enabled: _loadFromEnv('METRICS_ENABLED', config.metrics.enabled, _typeCasts.bool),
host: _loadFromEnv('METRICS_HOST', config.metrics.host),
ingestPort: _loadFromEnv('METRICS_PORT_INGEST', config.metrics.ingestPort, _typeCasts.int),
checkpointPort: _loadFromEnv('METRICS_PORT_CHECKPOINT', config.metrics.checkpointPort, _typeCasts.int),
snapshotPort: _loadFromEnv('METRICS_PORT_SNAPSHOT', config.metrics.snapshotPort, _typeCasts.int),
diskUsagePort: _loadFromEnv('METRICS_PORT_DISK_USAGE', config.metrics.diskUsagePort, _typeCasts.int),
reindexPort: _loadFromEnv('METRICS_PORT_REINDEX', config.metrics.reindexPort, _typeCasts.int),
repairPort: _loadFromEnv('METRICS_PORT_REPAIR', config.metrics.repairPort, _typeCasts.int),
};
return parsedConfig;
}
/**
* Returns a new Config instance merging the loaded config with the provided values.
* Passed values override loaded ones recursively.
*
* @param {object} newConfig - an object using the same structure as the config file
* @returns {Config} - New Config instance
*/
static merge(newConfig) {
return new Config(newConfig);
}
}
module.exports = new Config();

View File

@ -1,130 +0,0 @@
const Joi = require('@hapi/joi');
const { allowedFilterFields, allowedFilterStates } = require('../constants');
const backoffSchema = Joi.object({
min: Joi.number(),
max: Joi.number(),
deadline: Joi.number(),
jitter: Joi.number(),
factor: Joi.number(),
});
const redisRetrySchema = Joi.object({
connectBackoff: backoffSchema,
});
const redisServerSchema = Joi.object({
host: Joi.string(),
port: Joi.number(),
password: Joi.string().allow(''),
retry: redisRetrySchema,
});
const redisSentinelSchema = Joi.object({
name: Joi.string().default('utapi'),
sentinels: Joi.array().items(Joi.object({
host: Joi.alternatives(Joi.string().hostname(), Joi.string().ip()),
port: Joi.number().port(),
})),
password: Joi.string().default('').allow(''),
sentinelPassword: Joi.string().default('').allow(''),
retry: redisRetrySchema,
});
const warp10SingleHost = Joi.object({
host: Joi.alternatives(Joi.string().hostname(), Joi.string().ip()),
port: Joi.number().port(),
readToken: Joi.string(),
writeToken: Joi.string(),
});
const warp10MultiHost = Joi.object({
hosts: Joi.array().items(Joi.object({
host: Joi.alternatives(Joi.string().hostname(), Joi.string().ip()),
port: Joi.number().port(),
nodeId: Joi.string(),
})),
readToken: Joi.string(),
writeToken: Joi.string(),
});
const tlsSchema = Joi.object({
key: Joi.string(),
cert: Joi.string(),
ca: Joi.string(),
});
const schema = Joi.object({
host: Joi.string(),
port: Joi.number().port(),
certFilePaths: tlsSchema.default({}),
workers: Joi.number(),
development: Joi.boolean(),
log: Joi.object({
logLevel: Joi.alternatives()
.try('error', 'warn', 'info', 'debug', 'trace'),
dumpLevel: Joi.alternatives()
.try('error', 'warn', 'info', 'debug', 'trace'),
}),
redis: Joi.alternatives().try(redisServerSchema, redisSentinelSchema),
localCache: Joi.alternatives().try(redisServerSchema, redisSentinelSchema),
warp10: Joi.alternatives().try(warp10SingleHost, warp10MultiHost),
healthChecks: Joi.object({
allowFrom: Joi.array().items(Joi.string()),
}),
vaultd: Joi.object({
host: Joi.string().hostname(),
port: Joi.number().port(),
}),
reindex: Joi.object({
enabled: Joi.boolean(),
schedule: Joi.string(),
}),
bucketd: Joi.array().items(Joi.string()),
expireMetrics: Joi.boolean(),
expireMetricsTTL: Joi.number(),
cacheBackend: Joi.string().valid('memory', 'redis'),
nodeId: Joi.string(),
ingestionSchedule: Joi.string(),
ingestionShardSize: Joi.number().greater(0),
ingestionLagSeconds: Joi.number().greater(0),
checkpointSchedule: Joi.string(),
snapshotSchedule: Joi.string(),
repairSchedule: Joi.string(),
reindexSchedule: Joi.string(),
diskUsageSchedule: Joi.string(),
diskUsage: Joi.object({
path: Joi.string(),
retentionDays: Joi.number().greater(0),
expirationEnabled: Joi.boolean(),
hardLimit: Joi.string(),
}),
serviceUser: Joi.object({
arn: Joi.string(),
enabled: Joi.boolean(),
}),
filter: Joi.object(allowedFilterStates.reduce(
(filterObj, state) => {
filterObj[state] = allowedFilterFields.reduce(
(stateObj, field) => {
stateObj[field] = Joi.array().items(Joi.string());
return stateObj;
}, {},
);
return filterObj;
}, {},
)),
metrics: {
enabled: Joi.boolean(),
host: Joi.string(),
ingestPort: Joi.number().port(),
checkpointPort: Joi.number().port(),
snapshotPort: Joi.number().port(),
diskUsagePort: Joi.number().port(),
reindexPort: Joi.number().port(),
repairPort: Joi.number().port(),
},
});
module.exports = schema;

View File

@ -1,133 +0,0 @@
const truthy = new Set([
'true',
'on',
'yes',
'y',
't',
'enabled',
'enable',
'1',
]);
const constants = {
envNamespace: 'UTAPI',
operations: [
'abortMultipartUpload',
'completeMultipartUpload',
'copyObject',
'createBucket',
'deleteBucket',
'deleteBucketCors',
'deleteBucketEncryption',
'deleteBucketLifecycle',
'deleteBucketReplication',
'deleteBucketTagging',
'deleteBucketWebsite',
'deleteObject',
'deleteObjectTagging',
'getBucketAcl',
'getBucketCors',
'getBucketEncryption',
'getBucketLifecycle',
'getBucketLocation',
'getBucketNotification',
'getBucketObjectLock',
'getBucketReplication',
'getBucketVersioning',
'getBucketTagging',
'getBucketWebsite',
'getObject',
'getObjectAcl',
'getObjectLegalHold',
'getObjectRetention',
'getObjectTagging',
'headBucket',
'headObject',
'initiateMultipartUpload',
'listBucket',
'listMultipartUploadParts',
'listMultipartUploads',
'multiObjectDelete',
'putBucketAcl',
'putBucketCors',
'putBucketEncryption',
'putBucketLifecycle',
'putBucketNotification',
'putBucketObjectLock',
'putBucketReplication',
'putBucketVersioning',
'putBucketTagging',
'putBucketWebsite',
'putDeleteMarkerObject',
'putObject',
'putObjectAcl',
'putObjectLegalHold',
'putObjectRetention',
'putObjectTagging',
'replicateDelete',
'replicateObject',
'replicateTags',
'uploadPart',
'uploadPartCopy',
],
eventFieldsToWarp10: {
operationId: 'op',
uuid: 'id',
bucket: 'bck',
object: 'obj',
versionId: 'vid',
account: 'acc',
user: 'usr',
location: 'loc',
objectDelta: 'objD',
sizeDelta: 'sizeD',
incomingBytes: 'inB',
outgoingBytes: 'outB',
operations: 'ops',
},
indexedEventFields: [
'acc',
'usr',
'bck',
],
serviceToWarp10Label: {
locations: 'loc',
accounts: 'acc',
users: 'usr',
buckets: 'bck',
},
warp10EventType: ':m:utapi/event:',
warp10RecordType: ':m:utapi/record:',
truthy,
checkpointLagSecs: 300,
snapshotLagSecs: 900,
repairLagSecs: 5,
counterBaseValueExpiration: 86400, // 24hrs
keyVersionSplitter: String.fromCharCode(0),
migrationChunksize: 500,
migrationOpTranslationMap: {
listBucketMultipartUploads: 'listMultipartUploads',
},
ingestionOpTranslationMap: {
putDeleteMarkerObject: 'deleteObject',
},
expirationChunkDuration: 900000000, // 15 minutes in microseconds
allowedFilterFields: [
'operationId',
'location',
'account',
'user',
'bucket',
],
allowedFilterStates: ['allow', 'deny'],
};
constants.operationToResponse = constants.operations
.reduce((prev, opId) => {
prev[opId] = `s3:${opId.charAt(0).toUpperCase() + opId.slice(1)}`;
return prev;
}, {});
module.exports = constants;

View File

@ -1,42 +0,0 @@
{
"AccessDenied": {
"code": 403,
"description": "Access Denied"
},
"InternalError": {
"code": 500,
"description": "The server encountered an internal error. Please retry the request."
},
"InvalidUri": {
"code": 400,
"description": "The requested URI does not represent any resource on the server."
},
"NotImplemented": {
"code": 501,
"description": "This operation has yet to be implemented."
},
"OperationIdNotFound": {
"code": 404,
"description": "The specified operation Id is not found."
},
"OperationTimedOut": {
"code": 500,
"description": "The operation could not be completed within the permitted time."
},
"ServiceUnavailable": {
"code": 503,
"description": "The server (or an internal component) is currently unavailable to receive requests. Please retry your request."
},
"InvalidRequest": {
"code": 400,
"description": "Request validation error"
},
"FailedMigration": {
"code": 1000,
"description": "failed to migrate metrics"
},
"FailedCorrection": {
"code": 1001,
"description": "failed to correct migrated metric"
}
}

View File

@ -1,27 +0,0 @@
/* eslint-disable no-param-reassign */
const utapiErrors = require('./errors.json');
class UtapiError extends Error {
constructor(type, code, desc) {
super(type);
this.code = code;
this.description = desc;
this[type] = true;
this.utapiError = true;
}
customizeDescription(description) {
return new UtapiError(this.message, this.code, description);
}
}
function errorsGen() {
return Object.keys(utapiErrors)
.reduce((errors, name) => {
errors[name] = new UtapiError(name, utapiErrors[name].code,
utapiErrors[name].description);
return errors;
}, {});
}
module.exports = errorsGen();

View File

@ -1,16 +0,0 @@
const bucketclient = require('bucketclient');
const { BucketClientInterface } = require('arsenal').storage.metadata.bucketclient;
const config = require('../config');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const params = {
bucketdBootstrap: config.bucketd,
https: config.tls,
};
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);

View File

@ -1,141 +0,0 @@
/* eslint-disable no-restricted-syntax */
const arsenal = require('arsenal');
const async = require('async');
const metadata = require('./client');
const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants');
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = arsenal.constants;
const { BucketInfo } = arsenal.models;
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
const PAGE_SIZE = 1000;
async function _listingWrapper(bucket, params) {
return new Promise(
(resolve, reject) => metadata.listObject(
bucket,
params,
logger.newRequestLogger(),
(err, res) => {
if (err) {
reject(err);
return;
}
resolve(res);
},
),
);
}
function _listObject(bucket, prefix, hydrateFunc) {
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
let gt;
return {
async* [Symbol.asyncIterator]() {
while (true) {
let res;
try {
// eslint-disable-next-line no-await-in-loop
res = await async.retryable(ebConfig, _listingWrapper)(bucket, { ...listingParams, gt });
} catch (error) {
moduleLogger.error('Error during listing', { error });
throw error;
}
for (const item of res) {
yield hydrateFunc ? hydrateFunc(item) : item;
}
if (res.length !== PAGE_SIZE) {
break;
}
gt = res[res.length - 1].key;
}
},
};
}
function listObjects(bucket) {
return _listObject(bucket, '', data => {
const { key, value } = data;
const [name, version] = key.split(keyVersionSplitter);
return {
name,
version,
value: JSON.parse(value),
};
});
}
function listBuckets() {
return _listObject(usersBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
async function listMPUs(bucket) {
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
return _listObject(mpuBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
function bucketExists(bucket) {
return new Promise((resolve, reject) => metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(),
err => {
if (err && (!err.is || !err.is.NoSuchBucket)) {
reject(err);
return;
}
resolve(err === null);
},
));
}
function getBucket(bucket) {
return new Promise((resolve, reject) => {
metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(), (err, data) => {
if (err) {
reject(err);
return;
}
resolve(BucketInfo.fromObj(data));
},
);
});
}
module.exports = {
listBuckets,
listObjects,
listMPUs,
bucketExists,
getBucket,
};

View File

@ -1,71 +0,0 @@
const Joi = require('@hapi/joi');
class BaseModel {
constructor(data) {
this._data = data || {};
}
_get(key, defaultValue) {
const val = this._data[key];
return val === undefined ? defaultValue : val;
}
_set(key, value) {
this._data[key] = value;
return this;
}
getValue() {
return this._data;
}
}
/*
Builds a flexible data container with automatic field checking using `hapi/joi`.
Getters and Setters are automatically created for fields so they can be accessed using `.` notation.
@param name - Name for the model. Used as the internal js class name.
@param schema - An object of joi schemas. Keys are used as field names and
the schemas are used to typecheck values.
@returns - A subclass of BaseModel
*/
function buildModel(name, schema) {
class Model extends BaseModel {
constructor(data) {
if (data !== undefined) {
Object.entries(data).forEach(([key, value]) => {
if (schema[key]) {
Joi.attempt(value, schema[key]);
}
});
}
super(data);
}
_set(key, value) {
if (schema[key]) {
Joi.attempt(value, schema[key]);
}
return super._set(key, value);
}
}
Object.defineProperty(Model, 'name', { value: name });
Object.keys(schema).forEach(key =>
Object.defineProperty(Model.prototype, key, {
// `function` is used rather than `=>` to work around context problem with `this`
/* eslint-disable func-names, object-shorthand */
get: function () {
return this._get(key);
},
set: function (value) {
this._set(key, value);
},
/* eslint-enable func-names, object-shorthand */
}));
return Model;
}
module.exports = {
BaseModel,
buildModel,
};

View File

@ -1,75 +0,0 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const { apiOperations } = require('../server/spec');
const ResponseContainer = require('./ResponseContainer');
const { httpRequestDurationSeconds } = require('../server/metrics');
const apiTags = Object.keys(apiOperations);
const apiOperationIds = Object.values(apiOperations)
.reduce((ids, ops) => {
ops.forEach(id => ids.add(id));
return ids;
}, new Set());
const contextSchema = {
host: Joi.string(),
protocol: Joi.string().valid('http', 'https'),
url: Joi.string().uri({ scheme: ['http', 'https'] }),
operationId: Joi.string().valid(...apiOperationIds),
tag: Joi.string().valid(...apiTags),
encrypted: Joi.boolean(),
logger: Joi.any(),
request: Joi.any(),
results: Joi.any(),
requestTimer: Joi.any(),
};
const RequestContextModel = buildModel('RequestContext', contextSchema);
class RequestContext extends RequestContextModel {
constructor(request) {
const host = request.headers.host || 'localhost';
const protocol = RequestContext._determineProtocol(request);
const encrypted = protocol === 'https';
const url = `${protocol}://${host}${request.url}`;
const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation;
const requestTimer = tag !== 'internal'
? httpRequestDurationSeconds.startTimer({ action: operationId })
: null;
request.logger.logger.addDefaultFields({
tag,
operationId,
service: 'utapi',
});
super({
request,
host,
url,
protocol,
operationId,
tag,
encrypted,
results: new ResponseContainer(),
logger: request.logger,
requestTimer,
});
}
static _determineProtocol(request) {
// Respect the X-Forwarded-Proto header if set
if (request.headers['x-forwarded-proto']) {
return request.headers['x-forwarded-proto'] === 'https' ? 'https' : 'http';
}
// Use req.connection.encrypted for fallback
return request.connection.encrypted !== undefined
&& request.connection.encrypted ? 'https' : 'http';
}
}
module.exports = RequestContext;

View File

@ -1,31 +0,0 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const orNull = schema => Joi.alternatives(schema, Joi.any().valid(null));
const responseSchema = {
body: Joi.any(),
statusCode: orNull(Joi.number().min(100).max(599)),
redirect: orNull(Joi.string().uri({ scheme: ['http', 'https'], allowRelative: true })),
};
const ResponseContainerModel = buildModel('RequestContext', responseSchema);
class ResponseContainer extends ResponseContainerModel {
constructor() {
super({ body: null, statusCode: null, redirect: null });
}
hasBody() {
return this.body !== null;
}
hasStatusCode() {
return this.statusCode !== null;
}
hasRedirect() {
return this.redirect !== null;
}
}
module.exports = ResponseContainer;

View File

@ -1,22 +0,0 @@
const Joi = require('@hapi/joi');
const { operations } = require('../constants');
const { buildModel } = require('./Base');
const metricSchema = {
operationId: Joi.string().valid(...operations),
uuid: Joi.string(),
timestamp: Joi.number(),
bucket: Joi.string(),
object: Joi.string(),
versionId: Joi.string(),
account: Joi.string(),
user: Joi.string(),
location: Joi.string(),
objectDelta: Joi.number(),
sizeDelta: Joi.number(),
incomingBytes: Joi.number(),
outgoingBytes: Joi.number(),
};
module.exports = buildModel('UtapiMetric', metricSchema);

View File

@ -1,13 +0,0 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const recordSchema = {
timestamp: Joi.number(),
objectDelta: Joi.number(),
sizeDelta: Joi.number(),
incomingBytes: Joi.number(),
outgoingBytes: Joi.number(),
operations: Joi.object(),
};
module.exports = buildModel('UtapiRecord', recordSchema);

View File

@ -1,13 +0,0 @@
const BaseModel = require('./Base');
const UtapiMetric = require('./UtapiMetric');
const UtapiRecord = require('./UtapiRecord');
const RequestContext = require('./RequestContext');
const ResponseContainer = require('./ResponseContainer');
module.exports = {
BaseModel,
UtapiMetric,
RequestContext,
ResponseContainer,
UtapiRecord,
};

View File

@ -1,45 +0,0 @@
const { EventEmitter } = require('events');
const os = require('os');
const { Command } = require('commander');
const { logger } = require('./utils');
class Process extends EventEmitter {
constructor(...options) {
super(...options);
this._program = null;
}
async setup() {
const cleanUpFunc = this.join.bind(this);
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
process.on(eventName, cleanUpFunc);
});
process.on('uncaughtException', error => {
logger.error('uncaught exception',
{ error, stack: error.stack.split(os.EOL) });
cleanUpFunc();
});
this._program = new Command();
await this._setup();
}
async start() {
this._program.parse(process.argv);
await this._start();
}
async join() {
this.emit('exit');
await this._join();
}
/* eslint-disable class-methods-use-this,no-empty-function */
async _setup() {}
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
}
module.exports = Process;

View File

@ -1,211 +0,0 @@
const EventEmitter = require('events');
const { callbackify, promisify } = require('util');
const IORedis = require('ioredis');
const { jsutil } = require('arsenal');
const BackOff = require('backo');
const { whilst } = require('async');
const errors = require('./errors');
const { LoggerContext } = require('./utils/log');
const { asyncOrCallback } = require('./utils/func');
const moduleLogger = new LoggerContext({
module: 'redis',
});
const COMMAND_TIMEOUT = 10000;
const CONNECTION_TIMEOUT = 30000;
/**
* Creates a new Redis client instance
* @param {object} conf - redis configuration
* @param {string} conf.host - redis host
* @param {number} conf.port - redis port
* @param {string} [conf.password] - redis password (optional)
* @param {string} [conf.sentinelPassword] - sentinel password (optional)
* @param {Array<Object>} conf.sentinels - sentinels
* @param {Werelogs.Logger} log - Werelogs logger
* @return {Redis} - Redis client instance
*/
class RedisClient extends EventEmitter {
constructor(options) {
super();
this._redisOptions = options;
this._redis = null;
// Controls the use of additional command timeouts
// Only use if connecting to a sentinel cluster
this._useTimeouts = options.sentinels !== undefined;
this._inFlightTimeouts = this._useTimeouts ? new Set() : null;
this._runningRedisProbe = null;
this._isConnected = false;
this._isReady = false;
}
connect(callback) {
this._initClient(false);
if (callback) {
process.nextTick(callback);
}
}
disconnect(callback) {
return asyncOrCallback(async () => {
if (this._useTimeouts) {
Object.values(this._inFlightTimeouts)
.forEach(clearTimeout);
}
if (this._redis !== null) {
await this._redis.quit();
this._redis = null;
}
}, callback);
}
get isReady() {
return this._isConnected && this._isReady;
}
_initClient(startProbe = true) {
moduleLogger.debug('initializing redis client');
if (this._redis !== null) {
this._redis.off('connect', this._onConnect);
this._redis.off('ready', this._onReady);
this._redis.off('error', this._onError);
this._redis.disconnect();
}
this._isConnected = false;
this._isReady = false;
this._redis = new IORedis(this._redisOptions);
this._redis.on('connect', this._onConnect.bind(this));
this._redis.on('ready', this._onReady.bind(this));
this._redis.on('error', this._onError.bind(this));
if (startProbe && this._runningRedisProbe === null) {
this._runningRedisProbe = setInterval(this._probeRedis.bind(this), CONNECTION_TIMEOUT);
}
}
_probeRedis() {
if (this.isReady) {
moduleLogger.debug('redis client is ready, clearing reinitialize interval');
clearInterval(this._runningRedisProbe);
this._runningRedisProbe = null;
} else {
moduleLogger.warn('redis client has failed to become ready, reinitializing');
this._initClient();
}
}
_onConnect() {
this._isConnected = true;
this.emit('connect');
}
_onReady() {
this._isReady = true;
this.emit('ready');
}
_onError(error) {
this._isReady = false;
moduleLogger.error('error connecting to redis', { error });
if (this.listenerCount('error') > 0) {
this.emit('error', error);
}
}
_createCommandTimeout() {
let timer;
let onTimeout;
const cancelTimeout = jsutil.once(() => {
clearTimeout(timer);
this.off('timeout', onTimeout);
this._inFlightTimeouts.delete(timer);
});
const timeout = new Promise((_, reject) => {
timer = setTimeout(this.emit.bind(this, 'timeout'), COMMAND_TIMEOUT);
this._inFlightTimeouts.add(timer);
onTimeout = () => {
moduleLogger.warn('redis command timed out');
cancelTimeout();
this._initClient();
reject(errors.OperationTimedOut);
};
this.once('timeout', onTimeout);
});
return { timeout, cancelTimeout };
}
async _call(asyncFunc) {
const start = Date.now();
const { connectBackoff } = this._redisOptions.retry || {};
const backoff = new BackOff(connectBackoff);
const timeoutMs = (connectBackoff || {}).deadline || 2000;
let retried = false;
return new Promise((resolve, reject) => {
whilst(
next => { // WARNING: test is asynchronous in `async` v3
if (!connectBackoff && !this.isReady) {
moduleLogger.warn('redis not ready and backoff is not configured');
}
process.nextTick(next, null, !!connectBackoff && !this.isReady);
},
next => {
retried = true;
if ((Date.now() - start) > timeoutMs) {
moduleLogger.error('redis still not ready after max wait, giving up', { timeoutMs });
return next(errors.InternalError.customizeDescription(
'redis client is not ready',
));
}
const backoffDurationMs = backoff.duration();
moduleLogger.error('redis not ready, retrying', { backoffDurationMs });
return setTimeout(next, backoffDurationMs);
},
err => {
if (err) {
return reject(err);
}
if (retried) {
moduleLogger.info('redis connection recovered', {
recoveryOverheadMs: Date.now() - start,
});
}
const funcPromise = asyncFunc(this._redis);
if (!this._useTimeouts) {
// If timeouts are disabled simply return the Promise
return resolve(funcPromise);
}
const { timeout, cancelTimeout } = this._createCommandTimeout();
try {
// timeout always rejects so we can just return
return resolve(Promise.race([funcPromise, timeout]));
} finally {
cancelTimeout();
}
},
);
});
}
call(func, callback) {
if (callback !== undefined) {
// If a callback is provided `func` is assumed to also take a callback
// and is converted to a promise using promisify
return callbackify(this._call.bind(this))(promisify(func), callback);
}
return this._call(func);
}
}
module.exports = RedisClient;

View File

@ -1,5 +0,0 @@
const ApiController = require('../controller');
const controller = new ApiController('internal');
module.exports = controller.buildMap();

View File

@ -1,6 +0,0 @@
async function healthcheck(ctx) {
// eslint-disable-next-line no-param-reassign
ctx.results.statusCode = 200;
}
module.exports = healthcheck;

View File

@ -1,14 +0,0 @@
const { collectDefaultMetrics, register } = require('prom-client');
collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
async function prometheusMetrics(ctx) {
// eslint-disable-next-line no-param-reassign
ctx.results.statusCode = 200;
ctx.results.body = await register.metrics();
}
module.exports = prometheusMetrics;

View File

@ -1,5 +0,0 @@
const ApiController = require('../controller');
const controller = new ApiController('metrics');
module.exports = controller.buildMap();

View File

@ -1,63 +0,0 @@
const errors = require('../../../errors');
const { serviceToWarp10Label } = require('../../../constants');
const { clients: warp10Clients } = require('../../../warp10');
const { client: cache } = require('../../../cache');
const { now, iterIfError } = require('../../../utils');
/**
*
* @param {RequestContext} ctx - request context
* @param {object} params - request parameters
* @param {string} params.level - metric level
* @param {string} params.resource - Id of the requested resource
* @returns {Promise<undefined>} -
*/
async function getStorage(ctx, params) {
const { level, resource } = params;
if (level !== 'accounts') {
throw errors.BadRequest
.customizeDescription(`Unsupported level "${level}". Only "accounts" is currently supported`);
}
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
let storageUtilized;
if (base !== null) {
storageUtilized = counter + base;
} else {
const labelName = serviceToWarp10Label[params.level];
const labels = { [labelName]: resource };
const res = await iterIfError(warp10Clients, warp10 => {
const options = {
params: {
end: now(),
labels,
node: warp10.nodeId,
},
macro: 'utapi/getMetricsAt',
};
return warp10.exec(options);
}, error => ctx.logger.error('error while fetching metrics', { error }));
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { level, resource });
throw errors.InternalError;
}
const { sizeD: currentSize } = res.result[0];
await cache.updateAccountCounterBase(resource, currentSize);
storageUtilized = currentSize;
}
ctx.results.statusCode = 200;
ctx.results.body = {
storageUtilized: Math.max(storageUtilized, 0),
resource,
level,
};
}
module.exports = getStorage;

View File

@ -1,27 +0,0 @@
const errors = require('../../../errors');
const { UtapiMetric } = require('../../../models');
const { client: cacheClient } = require('../../../cache');
const { convertTimestamp } = require('../../../utils');
const { ingestionOpTranslationMap } = require('../../../constants');
async function ingestMetric(ctx, params) {
let metrics;
try {
metrics = params.body.map(m => new UtapiMetric({
...m,
timestamp: convertTimestamp(m.timestamp),
operationId: ingestionOpTranslationMap[m.operationId] || m.operationId,
}));
} catch (error) {
throw errors.InvalidRequest;
}
try {
await Promise.all(metrics.map(m => cacheClient.pushMetric(m)));
} catch (error) {
throw errors.ServiceUnavailable;
}
// eslint-disable-next-line no-param-reassign
ctx.results.statusCode = 200;
}
module.exports = ingestMetric;

View File

@ -1,113 +0,0 @@
const errors = require('../../../errors');
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
const { convertTimestamp, iterIfError } = require('../../../utils');
const { clients: warp10Clients } = require('../../../warp10');
const emptyOperationsResponse = Object.values(operationToResponse)
.reduce((prev, key) => {
prev[key] = 0;
return prev;
}, {});
const metricResponseKeys = {
buckets: 'bucketName',
accounts: 'accountId',
users: 'userId',
service: 'serviceName',
};
function positiveOrZero(value) {
return Math.max(value, 0);
}
async function listMetric(ctx, params) {
const labelName = serviceToWarp10Label[params.level];
const resources = params.body[params.level];
let [start, end] = params.body.timeRange;
if (end === undefined) {
end = Date.now();
}
let results;
try {
// A separate request will be made to warp 10 per requested resource
results = await Promise.all(
resources.map(async ({ resource, id }) => {
const labels = { [labelName]: id };
const res = await iterIfError(warp10Clients, warp10 => {
const options = {
params: {
start: convertTimestamp(start).toString(),
end: convertTimestamp(end).toString(),
labels,
node: warp10.nodeId,
},
macro: 'utapi/getMetrics',
};
return warp10.exec(options);
}, error => ctx.logger.error('error during warp 10 request', {
error,
requestParams: {
start,
end,
labels,
},
}));
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { resource, type: params.level });
throw errors.InternalError;
}
const rawMetrics = JSON.parse(res.result[0]);
// Due to various error cases it is possible for metrics in utapi to go negative.
// As this is nonsensical to the user we replace any negative values with zero.
const metrics = {
storageUtilized: rawMetrics.storageUtilized.map(positiveOrZero),
numberOfObjects: rawMetrics.numberOfObjects.map(positiveOrZero),
incomingBytes: positiveOrZero(rawMetrics.incomingBytes),
outgoingBytes: positiveOrZero(rawMetrics.outgoingBytes),
operations: rawMetrics.operations,
};
return {
resource,
metrics,
};
}),
);
} catch (error) {
ctx.logger.error('error fetching metrics from warp10', { error });
throw errors.InternalError;
}
// Convert the results from warp10 into the expected response format
const resp = results
.map(result => {
const operations = Object.entries(result.metrics.operations)
.reduce((prev, [key, value]) => {
prev[operationToResponse[key]] = value;
return prev;
}, {});
const metric = {
...result.metrics,
timeRange: [start, end],
operations: {
...emptyOperationsResponse,
...operations,
},
};
metric[metricResponseKeys[params.level]] = result.resource;
return metric;
});
ctx.results.body = resp;
ctx.results.statusCode = 200;
}
module.exports = listMetric;

View File

@ -1,188 +0,0 @@
/* eslint-disable no-param-reassign */
const { apiOperations, apiOperationMiddleware } = require('./spec');
const { middleware: utapiMiddleware } = require('./middleware');
const RequestContext = require('../models/RequestContext');
const errors = require('../errors');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'server.controller',
});
/**
* ApiController
* @param {string} tag - Controller tag to load, should match `x-router-controller` from openapi spec
* @returns {undefined}
*/
class APIController {
constructor(tag) {
this._handlers = APIController._collectHandlers(tag);
this._middleware = APIController._collectHandlerMiddleware(tag);
}
static _safeRequire(path) {
try {
// eslint-disable-next-line import/no-dynamic-require, global-require
return require(path);
} catch (error) {
if (error.code !== 'MODULE_NOT_FOUND') {
moduleLogger
.with({ method: 'APIController::_safeRequire' })
.error(`error while loading handler from ${path}`);
throw error;
}
return null;
}
}
static _notImplementedHandler(tag, operationId) {
// eslint-disable-next-line no-unused-vars
return async (ctx, params) => {
throw errors.NotImplemented.customizeDescription(
`the operation "${tag}::${operationId}" has not been implemented`,
);
};
}
static _getOperationHandler(tag, operationId) {
const op = APIController._safeRequire(`./API/${tag}/${operationId}`);
if (op === null) {
moduleLogger
.with({ method: 'APIController::_getOperationHandler' })
.error(`no handler for ${tag}:${operationId} found, using notImplemented handler`);
return APIController._notImplementedHandler(tag, operationId);
}
return op;
}
static _collectHandlers(tag) {
return Array.from(apiOperations[tag]).reduce((handlers, id) => {
handlers[id] = APIController._getOperationHandler(tag, id);
return handlers;
}, {});
}
static _collectHandlerMiddleware(tag) {
return Object.entries(apiOperationMiddleware[tag])
.reduce((handlers, [id, handler]) => {
const middleware = [];
if (handler.iplimit) {
middleware.push(utapiMiddleware.clientIpLimitMiddleware);
}
if (handler.authv4) {
middleware.push(utapiMiddleware.authV4Middleware);
}
handlers[id] = middleware;
return handlers;
}, {});
}
static _extractParams(req) {
return Object.entries(req.swagger.params)
.reduce((params, [key, value]) => {
params[key] = value.value;
return params;
}, {});
}
static async _writeResult(results, response) {
// If no results have been set return a 500
if (
!results.hasRedirect()
&& !results.hasBody()
&& !results.hasStatusCode()
) {
throw errors.InternalError;
}
// If we have a redirect, do it
if (results.hasRedirect()) {
response.redirect(results.redirect);
// If we have both a body & status, send both
} else if (results.hasBody() && results.hasStatusCode()) {
response.status(results.statusCode).send(results.body);
// If all we have is a status code, then send it with an empty body
} else if (results.hasStatusCode() && !results.hasBody()) {
response.sendStatus(results.statusCode);
// If no status code is set, but we have a body, assume `200` and send
} else if (results.hasBody() && !results.hasStatusCode()) {
response.status(200).send(results.body);
}
}
static _buildRequestContext(req) {
return new RequestContext(req);
}
/**
* callOperation
*
* Constructs the request context, extracts operation parameters, calls the
* operation handler, and writes its result.
*
* @param {function} handler - Function returning a Promise implementing the operation
* @param {Request} request - Express request object
* @param {Response} response - Express response object
* @param {Object} params - Extracted request parameters
* @returns {undefined} -
*/
static async _callOperation(handler, request, response, params) {
try {
await handler(request.ctx, params);
} catch (error) {
request.logger.error('error during operation', { error });
throw error;
}
request.logger.debug('writing operation result');
try {
await APIController._writeResult(request.ctx.results, response);
} catch (error) {
request.logger.error(
'error while writing operation result',
{ error },
);
throw error;
}
}
static async _callMiddleware(middleware, request, response, params) {
await middleware.reduce(
(chain, mw) => (chain
? chain.then(() => mw(request, response, params))
: mw(request, response, params)),
null,
);
}
static callOperation(operationId, handler, middleware, request, response, done) {
request.ctx = APIController._buildRequestContext(request);
const requestParams = APIController._extractParams(request);
request.logger.debug(`calling middleware for ${operationId}`);
APIController._callMiddleware(middleware, request, response, requestParams)
.then(() => {
request.logger.debug(`calling operation ${operationId}`);
return APIController._callOperation(handler, request, response, requestParams);
})
.then(
done,
done,
);
}
/**
* buildMap
*
* Constructs an object of `operationId`|`callOperation` pairs for use as a controller with oas-tools
* @returns {Object} - Map of operationIds to handler
*/
buildMap() {
return Object.entries(this._handlers)
.reduce((ops, [id, handler]) => {
ops[id] = (request, response, done) =>
APIController.callOperation(id, handler, this._middleware[id], request, response, done);
return ops;
}, {});
}
}
module.exports = APIController;

View File

@ -1,101 +0,0 @@
const http = require('http');
const https = require('https');
const express = require('express');
const bodyParser = require('body-parser');
const { ciphers, dhparam } = require('arsenal').https;
const Process = require('../process');
const config = require('../config');
const { initializeOasTools, middleware } = require('./middleware');
const { spec: apiSpec } = require('./spec');
const { client: cacheClient } = require('../cache');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'server',
});
class UtapiServer extends Process {
constructor() {
super();
this._app = null;
this._server = null;
}
static async _createApp(spec) {
const app = express();
app.use(bodyParser.json({ strict: false }));
app.use(middleware.loggerMiddleware);
await initializeOasTools(spec, app);
app.use(middleware.errorMiddleware);
app.use(middleware.httpMetricsMiddleware);
app.use(middleware.responseLoggerMiddleware);
return app;
}
static _createHttpsAgent() {
const conf = {
ciphers: ciphers.ciphers,
dhparam: dhparam.dhparam,
cert: config.tls.cert,
key: config.tls.key,
ca: config.tls.ca ? [config.tls.ca] : null,
requestCert: false,
rejectUnauthorized: true,
};
const agent = new https.Agent(conf);
conf.agent = agent;
return conf;
}
static async _createServer(app) {
if (config.tls) {
return https.createServer(UtapiServer._createHttpsAgent(), app);
}
return http.createServer(app);
}
static async _startServer(server) {
moduleLogger
.with({
method: 'UtapiServer::_startServer',
cacheBackend: config.cacheBackend,
})
.info(`Server listening on ${config.port}`);
await server.listen(config.port);
}
async _setup() {
this._app = await UtapiServer._createApp(apiSpec);
this._server = await UtapiServer._createServer(this._app);
}
async _start() {
await cacheClient.connect();
await UtapiServer._startServer(this._server);
}
async _join() {
await this._server.close();
await cacheClient.disconnect();
}
}
async function startServer(conf) {
const server = new UtapiServer(conf);
try {
await server.setup();
await server.start();
} catch (error) {
moduleLogger
.with({ method: 'UtapiServer::startServer' })
.error('Unhandled Error!', { error: error.message });
await server.join();
throw error;
}
}
module.exports = {
UtapiServer,
startServer,
};

View File

@ -1,20 +0,0 @@
const promClient = require('prom-client');
const httpRequestsTotal = new promClient.Counter({
name: 's3_utapi_http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['action', 'code'],
});
const httpRequestDurationSeconds = new promClient.Histogram({
name: 's3_utapi_http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['action', 'code'],
// buckets for response time from 0.1ms to 60s
buckets: [0.0001, 0.005, 0.015, 0.05, 0.1, 0.2, 0.3, 0.4, 0.5, 1.0, 5.0, 15.0, 30.0, 60.0],
});
module.exports = {
httpRequestDurationSeconds,
httpRequestsTotal,
};

View File

@ -1,181 +0,0 @@
const oasTools = require('oas-tools');
const path = require('path');
const { promisify } = require('util');
const { ipCheck } = require('arsenal');
const config = require('../config');
const { logger, buildRequestLogger } = require('../utils');
const errors = require('../errors');
const { translateAndAuthorize } = require('../vault');
const metricHandlers = require('./metrics');
const oasOptions = {
controllers: path.join(__dirname, './API/'),
checkControllers: true,
loglevel: config.logging.level === 'trace' ? 'debug' : 'info', // oasTools is very verbose
customLogger: logger,
customErrorHandling: true,
strict: true,
router: true,
validator: true,
docs: {
apiDocs: '/openapi.json',
apiDocsPrefix: '',
},
ignoreUnknownFormats: true,
};
// If in development mode, enable the swagger ui
if (config.development) {
oasOptions.docs = {
swaggerUi: '/_/docs',
swaggerUiPrefix: '',
...oasOptions.docs,
};
}
async function initializeOasTools(spec, app) {
oasTools.configure(oasOptions);
return promisify(oasTools.initialize)(spec, app);
}
function loggerMiddleware(req, res, next) {
// eslint-disable-next-line no-param-reassign
req.logger = buildRequestLogger(req);
req.logger.info('Received request');
return next();
}
function responseLoggerMiddleware(req, res, next) {
const info = {
httpCode: res.statusCode,
httpMessage: res.statusMessage,
};
req.logger.end('finished handling request', info);
if (next !== undefined) {
next();
}
}
function httpMetricsMiddleware(request, response, next) {
// If the request.ctx is undefined then this is an internal oasTools request (/_/docs)
// No metrics should be pushed
if (config.metrics.enabled && request.ctx && request.ctx.tag !== 'internal') {
metricHandlers.httpRequestsTotal
.labels({
action: request.ctx.operationId,
code: response.statusCode,
}).inc(1);
request.ctx.requestTimer({ code: response.statusCode });
}
if (next) {
next();
}
}
// next is purposely not called as all error responses are handled here
// eslint-disable-next-line no-unused-vars
function errorMiddleware(err, req, res, next) {
let statusCode = err.code || 500;
let code = err.message || 'InternalError';
let message = err.description || 'Internal Error';
// failed request validation by oas-tools
if (err.failedValidation) {
// You can't actually use destructing here
/* eslint-disable prefer-destructuring */
statusCode = errors.InvalidRequest.code;
code = errors.InvalidRequest.message;
message = errors.InvalidRequest.description;
/* eslint-enable prefer-destructuring */
}
if (!err.utapiError && !config.development) {
// Make sure internal errors don't leak when not in development
code = 'InternalError';
message = 'Internal Error';
}
res.status(statusCode).send({
code,
message,
});
responseLoggerMiddleware(req, res, () => httpMetricsMiddleware(req, res));
}
// eslint-disable-next-line no-unused-vars
async function authV4Middleware(request, response, params) {
const authHeader = request.headers.authorization;
if (!authHeader || !authHeader.startsWith('AWS4-')) {
request.logger.error('missing auth header for v4 auth');
throw errors.InvalidRequest.customizeDescription('Must use Auth V4 for this request.');
}
let action = 'ListMetrics';
let requestedResources = [];
switch (request.ctx.operationId) {
case 'listMetrics':
requestedResources = params.body[params.level];
action = params.Action;
break;
default:
requestedResources = [params.resource];
break;
}
if (requestedResources.length === 0) {
throw errors.InvalidRequest.customizeDescription('You must specify at least one resource');
}
let passed;
let authorizedResources;
try {
[passed, authorizedResources] = await translateAndAuthorize(request, action, params.level, requestedResources);
} catch (error) {
request.logger.error('error during authentication', { error });
// rethrow any access denied errors
if ((error.is && error.is.AccessDenied) || (error.utapiError && error.AccessDenied)) {
throw error;
}
throw errors.InternalError;
}
if (!passed) {
request.logger.trace('not authorized to access any requested resources');
throw errors.AccessDenied;
}
switch (request.ctx.operationId) {
case 'listMetrics':
params.body[params.level] = authorizedResources;
break;
default:
[params.resource] = authorizedResources;
break;
}
}
async function clientIpLimitMiddleware(request) {
const allowIp = ipCheck.ipMatchCidrList(
config.healthChecks.allowFrom, request.ip,
);
if (!allowIp) {
throw errors.AccessDenied.customizeDescription('unauthorized origin ip on request');
}
}
module.exports = {
initializeOasTools,
middleware: {
loggerMiddleware,
errorMiddleware,
responseLoggerMiddleware,
authV4Middleware,
clientIpLimitMiddleware,
httpMetricsMiddleware,
},
};

View File

@ -1,84 +0,0 @@
const fs = require('fs');
const path = require('path');
const http = require('http');
const jsyaml = require('js-yaml');
const httpMethods = http.METHODS.map(i => i.toLowerCase());
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'server.spec',
});
function _loadOpenApiSpec() {
const spec = fs.readFileSync(path.join(__dirname, '../../openapi.yaml'), 'utf8');
return jsyaml.safeLoad(spec);
}
function _getApiOperationIds(routes) {
return Object.keys(routes)
.reduce((optIds, path) => {
httpMethods.forEach(method => {
if (routes[path][method] !== undefined) {
const tag = routes[path][method]['x-router-controller'];
const optId = routes[path][method].operationId;
moduleLogger
.with({ method: '_getApiOperationIds' })
.trace('Registering handler', { tag, operationId: optId });
if (optIds[tag] === undefined) {
// eslint-disable-next-line no-param-reassign
optIds[tag] = new Set([optId]);
} else {
optIds[tag].add(optId);
}
}
});
return optIds;
}, {});
}
function _getApiOperationMiddleware(routes) {
return Object.values(routes)
.reduce((optIds, ops) => {
Object.entries(ops)
.filter(([method]) => httpMethods.includes(method))
// eslint-disable-next-line no-unused-vars
.forEach(([_, op]) => {
const middleware = {};
const tag = op['x-router-controller'];
if (optIds[tag] === undefined) {
optIds[tag] = {};
}
if (op['x-authv4'] === true) {
middleware.authv4 = true;
}
if (op['x-iplimit'] === true) {
middleware.iplimit = true;
}
optIds[tag][op.operationId] = middleware;
moduleLogger
.with({ method: '_getApiOperationMiddleware' })
.trace('Registering middleware for handler', {
tag,
operationId: op.operationId,
middleware: Object.keys(middleware),
});
});
return optIds;
}, {});
}
const spec = _loadOpenApiSpec();
const apiOperations = _getApiOperationIds(spec.paths);
const apiOperationMiddleware = _getApiOperationMiddleware(spec.paths);
module.exports = {
spec,
apiOperations,
apiOperationMiddleware,
};

View File

@ -1,208 +0,0 @@
const assert = require('assert');
const cron = require('node-schedule');
const cronparser = require('cron-parser');
const promClient = require('prom-client');
const { DEFAULT_METRICS_ROUTE } = require('arsenal').network.probe.ProbeServer;
const { client: cacheClient } = require('../cache');
const Process = require('../process');
const { LoggerContext, iterIfError, startProbeServer } = require('../utils');
const logger = new LoggerContext({
module: 'BaseTask',
});
class Now {}
class BaseTask extends Process {
constructor(options) {
super();
assert.notStrictEqual(options, undefined);
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
this._cache = cacheClient;
this._warp10Clients = options.warp10;
this._scheduler = null;
this._defaultSchedule = Now;
this._defaultLag = 0;
this._enableMetrics = options.enableMetrics || false;
this._metricsHost = options.metricsHost || 'localhost';
this._metricsPort = options.metricsPort || 9001;
this._metricsHandlers = null;
this._probeServer = null;
}
async _setup(includeDefaultOpts = true) {
if (includeDefaultOpts) {
this._program
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
.option(
'-s, --schedule <crontab>',
'Execute task using this crontab. Overrides configured schedule',
value => {
cronparser.parseExpression(value);
return value;
},
)
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
.option('-n, --node-id <id>', 'Set a custom node id');
}
if (this._enableMetrics) {
promClient.collectDefaultMetrics({
timeout: 10000,
gcDurationBuckets: [0.001, 0.01, 0.1, 1, 2, 5],
});
this._metricsHandlers = {
...this._registerDefaultMetricHandlers(),
...this._registerMetricHandlers(),
};
await this._createProbeServer();
}
}
_registerDefaultMetricHandlers() {
const taskName = this.constructor.name;
// Get the name of our subclass in snake case format eg BaseClass => _base_class
const taskNameSnake = taskName.replace(/[A-Z]/g, letter => `_${letter.toLowerCase()}`);
const executionDuration = new promClient.Gauge({
name: `s3_utapi${taskNameSnake}_duration_seconds`,
help: `Execution time of the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionAttempts = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_attempts_total`,
help: `Total number of attempts to execute the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
const executionFailures = new promClient.Counter({
name: `s3_utapi${taskNameSnake}_failures_total`,
help: `Total number of failures executing the ${taskName} task`,
labelNames: ['origin', 'containerName'],
});
return {
executionDuration,
executionAttempts,
executionFailures,
};
}
// eslint-disable-next-line class-methods-use-this
_registerMetricHandlers() {
return {};
}
async _createProbeServer() {
this._probeServer = await startProbeServer({
bindAddress: this._metricsHost,
port: this._metricsPort,
});
this._probeServer.addHandler(
DEFAULT_METRICS_ROUTE,
(res, log) => {
log.debug('metrics requested');
res.writeHead(200, {
'Content-Type': promClient.register.contentType,
});
promClient.register.metrics().then(metrics => {
res.end(metrics);
});
},
);
}
get schedule() {
if (this._program.now) {
return Now;
}
if (this._program.schedule) {
return this._program.schedule;
}
return this._defaultSchedule;
}
get lag() {
if (this._program.lag !== undefined) {
return this._program.lag;
}
return this._defaultLag;
}
async _start() {
await this._cache.connect();
if (this.schedule === Now) {
setImmediate(async () => {
await this.execute();
this.join();
});
} else {
this._scheduler = cron.scheduleJob(this.schedule,
async () => {
this._scheduler.cancel(); // Halt execution to avoid overlapping tasks
await this.execute();
this._scheduler.reschedule(this.schedule);
});
this.on('exit', () => {
this._scheduler.cancel();
});
}
}
async execute() {
let endTimer;
if (this._enableMetrics) {
endTimer = this._metricsHandlers.executionDuration.startTimer();
this._metricsHandlers.executionAttempts.inc(1);
}
try {
const timestamp = new Date() * 1000; // Timestamp in microseconds;
const laggedTimestamp = timestamp - (this.lag * 1000000);
await this._execute(laggedTimestamp);
} catch (error) {
logger.error('Error during task execution', { error });
this._metricsHandlers.executionFailures.inc(1);
}
if (this._enableMetrics) {
endTimer();
}
}
// eslint-disable-next-line class-methods-use-this
async _execute(timestamp) {
logger.info(`Default Task ${timestamp}`);
}
async _join() {
if (this._probeServer !== null) {
this._probeServer.stop();
}
return this._cache.disconnect();
}
withWarp10(func, onError) {
return iterIfError(this._warp10Clients, func, error => {
if (onError) {
onError(error);
} else {
const {
name, code, message, stack,
} = error;
logger.error('error during warp 10 request', {
error: {
name, code, errmsg: message, stack: name !== 'RequestError' ? stack : undefined,
},
});
}
});
}
}
module.exports = BaseTask;

Some files were not shown because too many files have changed in this diff Show More