Compare commits

..

1 Commits

Author SHA1 Message Date
Taylor McKinnon 943b2f239b remove secrets: inherit 2024-05-15 10:26:28 -07:00
34 changed files with 5515 additions and 828 deletions

View File

@ -1,87 +0,0 @@
# General support information
GitHub Issues are **reserved** for actionable bug reports (including
documentation inaccuracies), and feature requests.
**All questions** (regarding configuration, usecases, performance, community,
events, setup and usage recommendations, among other things) should be asked on
the **[Zenko Forum](http://forum.zenko.io/)**.
> Questions opened as GitHub issues will systematically be closed, and moved to
> the [Zenko Forum](http://forum.zenko.io/).
--------------------------------------------------------------------------------
## Avoiding duplicates
When reporting a new issue/requesting a feature, make sure that we do not have
any duplicates already open:
- search the issue list for this repository (use the search bar, select
"Issues" on the left pane after searching);
- if there is a duplicate, please do not open your issue, and add a comment
to the existing issue instead.
--------------------------------------------------------------------------------
## Bug report information
(delete this section (everything between the lines) if you're not reporting a
bug but requesting a feature)
### Description
Briefly describe the problem you are having in a few paragraphs.
### Steps to reproduce the issue
Please provide steps to reproduce, including full log output
### Actual result
Describe the results you received
### Expected result
Describe the results you expected
### Additional information
- Node.js version,
- Docker version,
- npm version,
- distribution/OS,
- optional: anything else you deem helpful to us.
--------------------------------------------------------------------------------
## Feature Request
(delete this section (everything between the lines) if you're not requesting
a feature but reporting a bug)
### Proposal
Describe the feature
### Current behavior
What currently happens
### Desired behavior
What you would like to happen
### Usecase
Please provide usecases for changing the current behavior
### Additional information
- Is this request for your company? Y/N
- If Y: Company name:
- Are you using any Scality Enterprise Edition products (RING, Zenko EE)? Y/N
- Are you willing to contribute this feature yourself?
- Position/Title:
- How did you hear about us?
--------------------------------------------------------------------------------

View File

@ -30,16 +30,16 @@ jobs:
file: ${{ github.event.inputs.dockerfile}} file: ${{ github.event.inputs.dockerfile}}
tag: ${{ github.event.inputs.tag }} tag: ${{ github.event.inputs.tag }}
release: # release:
if: ${{ inputs.create-github-release }} # if: ${{ inputs.create-github-release }}
runs-on: ubuntu-latest # runs-on: ubuntu-latest
needs: build # needs: build
steps: # steps:
- uses: softprops/action-gh-release@v2 # - uses: softprops/action-gh-release@v2
env: # env:
GITHUB_TOKEN: ${{ github.token }} # GITHUB_TOKEN: ${{ github.token }}
with: # with:
name: Release ${{ github.event.inputs.tag }} # name: Release ${{ github.event.inputs.tag }}
tag_name: ${{ github.event.inputs.tag }} # tag_name: ${{ github.event.inputs.tag }}
generate_release_notes: true # generate_release_notes: true
target_commitish: ${{ github.sha }} # target_commitish: ${{ github.sha }}

View File

@ -3,8 +3,9 @@
![Utapi logo](res/utapi-logo.png) ![Utapi logo](res/utapi-logo.png)
[![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi) [![Circle CI][badgepub]](https://circleci.com/gh/scality/utapi)
[![Scality CI][badgepriv]](http://ci.ironmann.io/gh/scality/utapi)
Service Utilization API for tracking resource usage and metrics reporting. Service Utilization API for tracking resource usage and metrics reporting
## Design ## Design
@ -87,13 +88,13 @@ Server is running.
1. Create an IAM user 1. Create an IAM user
``` ```
aws iam --endpoint-url <endpoint> create-user --user-name <user-name> aws iam --endpoint-url <endpoint> create-user --user-name utapiuser
``` ```
2. Create access key for the user 2. Create access key for the user
``` ```
aws iam --endpoint-url <endpoint> create-access-key --user-name <user-name> aws iam --endpoint-url <endpoint> create-access-key --user-name utapiuser
``` ```
3. Define a managed IAM policy 3. Define a managed IAM policy
@ -202,11 +203,12 @@ Server is running.
5. Attach user to the managed policy 5. Attach user to the managed policy
``` ```
aws --endpoint-url <endpoint> iam attach-user-policy --user-name aws --endpoint-url <endpoint> iam attach-user-policy --user-name utapiuser
<user-name> --policy-arn <policy arn> --policy-arn <policy arn>
``` ```
Now the user has access to ListMetrics request in Utapi on all buckets. Now the user `utapiuser` has access to ListMetrics request in Utapi on all
buckets.
### Signing request with Auth V4 ### Signing request with Auth V4
@ -222,18 +224,16 @@ following urls for reference.
You may also view examples making a request with Auth V4 using various languages You may also view examples making a request with Auth V4 using various languages
and AWS SDKs [here](/examples). and AWS SDKs [here](/examples).
Alternatively, you can use a nifty command line tool available in Scality's Alternatively, you can use a nifty command line tool available in Scality's S3.
CloudServer.
You can git clone the CloudServer repo from here You can git clone S3 repo from here https://github.com/scality/S3.git and follow
https://github.com/scality/cloudserver and follow the instructions in the README the instructions in README to install the dependencies.
to install the dependencies.
If you have CloudServer running inside a docker container you can docker exec If you have S3 running inside a docker container you can docker exec into the S3
into the CloudServer container as container as
``` ```
docker exec -it <container-id> bash docker exec -it <container id> bash
``` ```
and then run the command and then run the command
@ -271,7 +271,7 @@ Usage: list_metrics [options]
-v, --verbose -v, --verbose
``` ```
An example call to list metrics for a bucket `demo` to Utapi in a https enabled A typical call to list metrics for a bucket `demo` to Utapi in a https enabled
deployment would be deployment would be
``` ```
@ -283,7 +283,7 @@ Both start and end times are time expressed as UNIX epoch timestamps **expressed
in milliseconds**. in milliseconds**.
Keep in mind, since Utapi metrics are normalized to the nearest 15 min. Keep in mind, since Utapi metrics are normalized to the nearest 15 min.
interval, start time and end time need to be in the specific format as follows. interval, so start time and end time need to be in specific format as follows.
#### Start time #### Start time
@ -297,7 +297,7 @@ Date: Tue Oct 11 2016 17:35:25 GMT-0700 (PDT)
Unix timestamp (milliseconds): 1476232525320 Unix timestamp (milliseconds): 1476232525320
Here's an example JS method to get a start timestamp Here's a typical JS method to get start timestamp
```javascript ```javascript
function getStartTimestamp(t) { function getStartTimestamp(t) {
@ -317,7 +317,7 @@ seconds and milliseconds set to 59 and 999 respectively. So valid end timestamps
would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and would look something like `09:14:59:999`, `09:29:59:999`, `09:44:59:999` and
`09:59:59:999`. `09:59:59:999`.
Here's an example JS method to get an end timestamp Here's a typical JS method to get end timestamp
```javascript ```javascript
function getEndTimestamp(t) { function getEndTimestamp(t) {
@ -342,3 +342,4 @@ In order to contribute, please follow the
https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md). https://github.com/scality/Guidelines/blob/master/CONTRIBUTING.md).
[badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg [badgepub]: http://circleci.com/gh/scality/utapi.svg?style=svg
[badgepriv]: http://ci.ironmann.io/gh/scality/utapi.svg?style=svg

View File

@ -1,47 +0,0 @@
#!/bin/bash
# set -e stops the execution of a script if a command or pipeline has an error
set -e
# modifying config.json
JQ_FILTERS_CONFIG="."
if [[ "$LOG_LEVEL" ]]; then
if [[ "$LOG_LEVEL" == "info" || "$LOG_LEVEL" == "debug" || "$LOG_LEVEL" == "trace" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .log.logLevel=\"$LOG_LEVEL\""
echo "Log level has been modified to $LOG_LEVEL"
else
echo "The log level you provided is incorrect (info/debug/trace)"
fi
fi
if [[ "$WORKERS" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .workers=\"$WORKERS\""
fi
if [[ "$REDIS_HOST" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.host=\"$REDIS_HOST\""
fi
if [[ "$REDIS_PORT" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .redis.port=\"$REDIS_PORT\""
fi
if [[ "$VAULTD_HOST" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.host=\"$VAULTD_HOST\""
fi
if [[ "$VAULTD_PORT" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .vaultd.port=\"$VAULTD_PORT\""
fi
if [[ "$HEALTHCHECKS_ALLOWFROM" ]]; then
JQ_FILTERS_CONFIG="$JQ_FILTERS_CONFIG | .healthChecks.allowFrom=[\"$HEALTHCHECKS_ALLOWFROM\"]"
fi
if [[ $JQ_FILTERS_CONFIG != "." ]]; then
jq "$JQ_FILTERS_CONFIG" config.json > config.json.tmp
mv config.json.tmp config.json
fi
exec "$@"

View File

@ -1,42 +0,0 @@
# Utapi Release Plan
## Docker Image Generation
Docker images are hosted on [ghcr.io](https://github.com/orgs/scality/packages).
Utapi has one namespace there:
* Namespace: ghcr.io/scality/utapi
With every CI build, the CI will push images, tagging the
content with the developer branch's short SHA-1 commit hash.
This allows those images to be used by developers, CI builds,
build chain and so on.
Tagged versions of utapi will be stored in the production namespace.
## How to Pull Docker Images
```sh
docker pull ghcr.io/scality/utapi:<commit hash>
docker pull ghcr.io/scality/utapi:<tag>
```
## Release Process
To release a production image:
* Name the tag for the repository and Docker image.
* Use the `yarn version` command with the same tag to update `package.json`.
* Create a PR and merge the `package.json` change.
* Tag the repository using the same tag.
* [Force a build] using:
* A given branch that ideally matches the tag.
* The `release` stage.
* An extra property with the name `tag` and its value being the actual tag.
[Force a build]:
https://eve.devsca.com/github/scality/utapi/#/builders/bootstrap/force/force

View File

@ -1,90 +0,0 @@
import sys, os, base64, datetime, hashlib, hmac, datetime, calendar, json
import requests # pip install requests
access_key = '9EQTVVVCLSSG6QBMNKO5'
secret_key = 'T5mK/skkkwJ/mTjXZnHyZ5UzgGIN=k9nl4dyTmDH'
method = 'POST'
service = 's3'
host = 'localhost:8100'
region = 'us-east-1'
canonical_uri = '/buckets'
canonical_querystring = 'Action=ListMetrics&Version=20160815'
content_type = 'application/x-amz-json-1.0'
algorithm = 'AWS4-HMAC-SHA256'
t = datetime.datetime.utcnow()
amz_date = t.strftime('%Y%m%dT%H%M%SZ')
date_stamp = t.strftime('%Y%m%d')
# Key derivation functions. See:
# http://docs.aws.amazon.com/general/latest/gr/signature-v4-examples.html#signature-v4-examples-python
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
def getSignatureKey(key, date_stamp, regionName, serviceName):
kDate = sign(('AWS4' + key).encode('utf-8'), date_stamp)
kRegion = sign(kDate, regionName)
kService = sign(kRegion, serviceName)
kSigning = sign(kService, 'aws4_request')
return kSigning
def get_start_time(t):
start = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
return calendar.timegm(start.utctimetuple()) * 1000;
def get_end_time(t):
end = t.replace(minute=t.minute - t.minute % 15, second=0, microsecond=0)
return calendar.timegm(end.utctimetuple()) * 1000 - 1;
start_time = get_start_time(datetime.datetime(2016, 1, 1, 0, 0, 0, 0))
end_time = get_end_time(datetime.datetime(2016, 2, 1, 0, 0, 0, 0))
# Request parameters for listing Utapi bucket metrics--passed in a JSON block.
bucketListing = {
'buckets': [ 'utapi-test' ],
'timeRange': [ start_time, end_time ],
}
request_parameters = json.dumps(bucketListing)
payload_hash = hashlib.sha256(request_parameters).hexdigest()
canonical_headers = \
'content-type:{0}\nhost:{1}\nx-amz-content-sha256:{2}\nx-amz-date:{3}\n' \
.format(content_type, host, payload_hash, amz_date)
signed_headers = 'content-type;host;x-amz-content-sha256;x-amz-date'
canonical_request = '{0}\n{1}\n{2}\n{3}\n{4}\n{5}' \
.format(method, canonical_uri, canonical_querystring, canonical_headers,
signed_headers, payload_hash)
credential_scope = '{0}/{1}/{2}/aws4_request' \
.format(date_stamp, region, service)
string_to_sign = '{0}\n{1}\n{2}\n{3}' \
.format(algorithm, amz_date, credential_scope,
hashlib.sha256(canonical_request).hexdigest())
signing_key = getSignatureKey(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, (string_to_sign).encode('utf-8'),
hashlib.sha256).hexdigest()
authorization_header = \
'{0} Credential={1}/{2}, SignedHeaders={3}, Signature={4}' \
.format(algorithm, access_key, credential_scope, signed_headers, signature)
# The 'host' header is added automatically by the Python 'requests' library.
headers = {
'Content-Type': content_type,
'X-Amz-Content-Sha256': payload_hash,
'X-Amz-Date': amz_date,
'Authorization': authorization_header
}
endpoint = 'http://' + host + canonical_uri + '?' + canonical_querystring;
r = requests.post(endpoint, data=request_parameters, headers=headers)
print (r.text)

1
images/warp10/.gitattributes vendored Normal file
View File

@ -0,0 +1 @@
*.jar filter=lfs diff=lfs merge=lfs -text

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:389d2135867c399a389901460c5f2cc09f4857d0c6d08632c2638c25fb150c46
size 15468553

View File

@ -1,4 +1,5 @@
/* eslint-disable global-require */ /* eslint-disable global-require */
// eslint-disable-line strict // eslint-disable-line strict
let toExport; let toExport;

View File

@ -1,13 +1,35 @@
/* eslint-disable no-bitwise */ /* eslint-disable no-bitwise */
const assert = require('assert'); const assert = require('assert');
const fs = require('fs'); const fs = require('fs');
const path = require('path');
/** /**
* Reads from a config file and returns the content as a config object * Reads from a config file and returns the content as a config object
*/ */
class Config { class Config {
constructor(config) { constructor() {
this.component = config.component; /*
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
this._basePath = path.resolve(__dirname, '..');
this.path = `${this._basePath}/config.json`;
if (process.env.UTAPI_CONFIG_FILE !== undefined) {
this.path = process.env.UTAPI_CONFIG_FILE;
}
// Read config automatically
this._getConfig();
}
_getConfig() {
let config;
try {
const data = fs.readFileSync(this.path, { encoding: 'utf-8' });
config = JSON.parse(data);
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
this.port = 9500; this.port = 9500;
if (config.port !== undefined) { if (config.port !== undefined) {
@ -93,26 +115,18 @@ class Config {
} }
} }
if (config.vaultclient) { this.vaultd = {};
// Instance passed from outside if (config.vaultd) {
this.vaultclient = config.vaultclient; if (config.vaultd.port !== undefined) {
this.vaultd = null; assert(Number.isInteger(config.vaultd.port)
} else { && config.vaultd.port > 0,
// Connection data 'bad config: vaultd port must be a positive integer');
this.vaultclient = null; this.vaultd.port = config.vaultd.port;
this.vaultd = {}; }
if (config.vaultd) { if (config.vaultd.host !== undefined) {
if (config.vaultd.port !== undefined) { assert.strictEqual(typeof config.vaultd.host, 'string',
assert(Number.isInteger(config.vaultd.port) 'bad config: vaultd host must be a string');
&& config.vaultd.port > 0, this.vaultd.host = config.vaultd.host;
'bad config: vaultd port must be a positive integer');
this.vaultd.port = config.vaultd.port;
}
if (config.vaultd.host !== undefined) {
assert.strictEqual(typeof config.vaultd.host, 'string',
'bad config: vaultd host must be a string');
this.vaultd.host = config.vaultd.host;
}
} }
} }
@ -127,11 +141,12 @@ class Config {
const { key, cert, ca } = config.certFilePaths const { key, cert, ca } = config.certFilePaths
? config.certFilePaths : {}; ? config.certFilePaths : {};
if (key && cert) { if (key && cert) {
const keypath = key; const keypath = (key[0] === '/') ? key : `${this._basePath}/${key}`;
const certpath = cert; const certpath = (cert[0] === '/')
? cert : `${this._basePath}/${cert}`;
let capath; let capath;
if (ca) { if (ca) {
capath = ca; capath = (ca[0] === '/') ? ca : `${this._basePath}/${ca}`;
assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK), assert.doesNotThrow(() => fs.accessSync(capath, fs.F_OK | fs.R_OK),
`File not found or unreachable: ${capath}`); `File not found or unreachable: ${capath}`);
} }
@ -163,7 +178,9 @@ class Config {
'bad config: onlyCountLatestWhenObjectLocked must be a boolean'); 'bad config: onlyCountLatestWhenObjectLocked must be a boolean');
this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked; this.onlyCountLatestWhenObjectLocked = config.onlyCountLatestWhenObjectLocked;
} }
return config;
} }
} }
module.exports = Config; module.exports = new Config();

View File

@ -81,17 +81,6 @@ class Datastore {
return this._client.call((backend, done) => backend.incr(key, done), cb); return this._client.call((backend, done) => backend.incr(key, done), cb);
} }
/**
* increment value of a key by the provided value
* @param {string} key - key holding the value
* @param {string} value - value containing the data
* @param {callback} cb - callback
* @return {undefined}
*/
incrby(key, value, cb) {
return this._client.call((backend, done) => backend.incrby(key, value, done), cb);
}
/** /**
* decrement value of a key by 1 * decrement value of a key by 1
* @param {string} key - key holding the value * @param {string} key - key holding the value

View File

@ -6,6 +6,8 @@ const async = require('async');
const { errors } = require('arsenal'); const { errors } = require('arsenal');
const { getMetricFromKey, getKeys, generateStateKey } = require('./schema'); const { getMetricFromKey, getKeys, generateStateKey } = require('./schema');
const s3metricResponseJSON = require('../models/s3metricResponse'); const s3metricResponseJSON = require('../models/s3metricResponse');
const config = require('./Config');
const Vault = require('./Vault');
const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month. const MAX_RANGE_MS = (((1000 * 60) * 60) * 24) * 30; // One month.
@ -21,6 +23,7 @@ class ListMetrics {
constructor(metric, component) { constructor(metric, component) {
this.metric = metric; this.metric = metric;
this.service = component; this.service = component;
this.vault = new Vault(config);
} }
/** /**
@ -80,10 +83,9 @@ class ListMetrics {
const resources = validator.get(this.metric); const resources = validator.get(this.metric);
const timeRange = validator.get('timeRange'); const timeRange = validator.get('timeRange');
const datastore = utapiRequest.getDatastore(); const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids // map account ids to canonical ids
if (this.metric === 'accounts') { if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => { return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) { if (err) {
return cb(err); return cb(err);
} }
@ -122,11 +124,10 @@ class ListMetrics {
const fifteenMinutes = 15 * 60 * 1000; // In milliseconds const fifteenMinutes = 15 * 60 * 1000; // In milliseconds
const timeRange = [start - fifteenMinutes, end]; const timeRange = [start - fifteenMinutes, end];
const datastore = utapiRequest.getDatastore(); const datastore = utapiRequest.getDatastore();
const vault = utapiRequest.getVault();
// map account ids to canonical ids // map account ids to canonical ids
if (this.metric === 'accounts') { if (this.metric === 'accounts') {
return vault.getCanonicalIds(resources, log, (err, list) => { return this.vault.getCanonicalIds(resources, log, (err, list) => {
if (err) { if (err) {
return cb(err); return cb(err);
} }

View File

@ -99,7 +99,6 @@ const metricObj = {
buckets: 'bucket', buckets: 'bucket',
accounts: 'accountId', accounts: 'accountId',
users: 'userId', users: 'userId',
location: 'location',
}; };
class UtapiClient { class UtapiClient {
@ -123,17 +122,13 @@ class UtapiClient {
const api = (config || {}).logApi || werelogs; const api = (config || {}).logApi || werelogs;
this.log = new api.Logger('UtapiClient'); this.log = new api.Logger('UtapiClient');
// By default, we push all resource types // By default, we push all resource types
this.metrics = ['buckets', 'accounts', 'users', 'service', 'location']; this.metrics = ['buckets', 'accounts', 'users', 'service'];
this.service = 's3'; this.service = 's3';
this.disableOperationCounters = false; this.disableOperationCounters = false;
this.enabledOperationCounters = []; this.enabledOperationCounters = [];
this.disableClient = true; this.disableClient = true;
if (config && !config.disableClient) { if (config && !config.disableClient) {
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
if (config.metrics) { if (config.metrics) {
const message = 'invalid property in UtapiClient configuration'; const message = 'invalid property in UtapiClient configuration';
assert(Array.isArray(config.metrics), `${message}: metrics ` assert(Array.isArray(config.metrics), `${message}: metrics `
@ -161,6 +156,9 @@ class UtapiClient {
if (config.enabledOperationCounters) { if (config.enabledOperationCounters) {
this.enabledOperationCounters = config.enabledOperationCounters; this.enabledOperationCounters = config.enabledOperationCounters;
} }
this.disableClient = false;
this.expireMetrics = config.expireMetrics;
this.expireMetricsTTL = config.expireMetricsTTL || 0;
} }
} }
@ -1156,69 +1154,6 @@ class UtapiClient {
}); });
} }
/**
*
* @param {string} location - name of data location
* @param {number} updateSize - size in bytes to update location metric by,
* could be negative, indicating deleted object
* @param {string} reqUid - Request Unique Identifier
* @param {function} callback - callback to call
* @return {undefined}
*/
pushLocationMetric(location, updateSize, reqUid, callback) {
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
const params = {
level: 'location',
service: 's3',
location,
};
this._checkMetricTypes(params);
const action = (updateSize < 0) ? 'decrby' : 'incrby';
const size = (updateSize < 0) ? -updateSize : updateSize;
return this.ds[action](generateKey(params, 'locationStorage'), size,
err => {
if (err) {
log.error('error pushing metric', {
method: 'UtapiClient.pushLocationMetric',
error: err,
});
return callback(errors.InternalError);
}
return callback();
});
}
/**
*
* @param {string} location - name of data backend to get metric for
* @param {string} reqUid - Request Unique Identifier
* @param {function} callback - callback to call
* @return {undefined}
*/
getLocationMetric(location, reqUid, callback) {
const log = this.log.newRequestLoggerFromSerializedUids(reqUid);
const params = {
level: 'location',
service: 's3',
location,
};
const redisKey = generateKey(params, 'locationStorage');
return this.ds.get(redisKey, (err, bytesStored) => {
if (err) {
log.error('error getting metric', {
method: 'UtapiClient: getLocationMetric',
error: err,
});
return callback(errors.InternalError);
}
// if err and bytesStored are null, key does not exist yet
if (bytesStored === null) {
return callback(null, 0);
}
return callback(null, bytesStored);
});
}
/** /**
* Get storage used by bucket/account/user/service * Get storage used by bucket/account/user/service
* @param {object} params - params for the metrics * @param {object} params - params for the metrics

View File

@ -16,19 +16,15 @@ const REINDEX_PYTHON_INTERPRETER = process.env.REINDEX_PYTHON_INTERPRETER !== un
? process.env.REINDEX_PYTHON_INTERPRETER ? process.env.REINDEX_PYTHON_INTERPRETER
: 'python3.7'; : 'python3.7';
const EXIT_CODE_SENTINEL_CONNECTION = 100;
class UtapiReindex { class UtapiReindex {
constructor(config) { constructor(config) {
this._enabled = false; this._enabled = false;
this._schedule = REINDEX_SCHEDULE; this._schedule = REINDEX_SCHEDULE;
this._redis = { this._sentinel = {
host: '127.0.0.1',
port: 16379,
name: 'scality-s3', name: 'scality-s3',
sentinelPassword: '', sentinelPassword: '',
sentinels: [{
host: '127.0.0.1',
port: 16379,
}],
}; };
this._bucketd = { this._bucketd = {
host: '127.0.0.1', host: '127.0.0.1',
@ -46,13 +42,14 @@ class UtapiReindex {
if (config && config.password) { if (config && config.password) {
this._password = config.password; this._password = config.password;
} }
if (config && config.redis) { if (config && config.sentinel) {
const { const {
name, sentinelPassword, sentinels, host, port, name, sentinelPassword,
} = config.redis; } = config.sentinel;
this._redis.name = name || this._redis.name; this._sentinel.host = host || this._sentinel.host;
this._redis.sentinelPassword = sentinelPassword || this._redis.sentinelPassword; this._sentinel.port = port || this._sentinel.port;
this._redis.sentinels = sentinels || this._redis.sentinels; this._sentinel.name = name || this._sentinel.name;
this._sentinel.sentinelPassword = sentinelPassword || this._sentinel.sentinelPassword;
} }
if (config && config.bucketd) { if (config && config.bucketd) {
const { host, port } = config.bucketd; const { host, port } = config.bucketd;
@ -71,9 +68,12 @@ class UtapiReindex {
_getRedisClient() { _getRedisClient() {
const client = new RedisClient({ const client = new RedisClient({
sentinels: this._redis.sentinels, sentinels: [{
name: this._redis.name, host: this._sentinel.host,
sentinelPassword: this._redis.sentinelPassword, port: this._sentinel.port,
}],
name: this._sentinel.name,
sentinelPassword: this._sentinel.sentinelPassword,
password: this._password, password: this._password,
}); });
client.connect(); client.connect();
@ -88,16 +88,16 @@ class UtapiReindex {
return this.ds.del(REINDEX_LOCK_KEY); return this.ds.del(REINDEX_LOCK_KEY);
} }
_buildFlags(sentinel) { _buildFlags() {
const flags = { const flags = {
/* eslint-disable camelcase */ /* eslint-disable camelcase */
sentinel_ip: sentinel.host, sentinel_ip: this._sentinel.host,
sentinel_port: sentinel.port, sentinel_port: this._sentinel.port,
sentinel_cluster_name: this._redis.name, sentinel_cluster_name: this._sentinel.name,
bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`, bucketd_addr: `http://${this._bucketd.host}:${this._bucketd.port}`,
}; };
if (this._redis.sentinelPassword) { if (this._sentinel.sentinelPassword) {
flags.redis_password = this._redis.sentinelPassword; flags.redis_password = this._sentinel.sentinelPassword;
} }
/* eslint-enable camelcase */ /* eslint-enable camelcase */
@ -115,8 +115,8 @@ class UtapiReindex {
return opts; return opts;
} }
_runScriptWithSentinels(path, remainingSentinels, done) { _runScript(path, done) {
const flags = this._buildFlags(remainingSentinels.shift()); const flags = this._buildFlags();
this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`); this._requestLogger.debug(`launching subprocess ${path} with flags: ${flags}`);
const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]); const process = childProcess.spawn(REINDEX_PYTHON_INTERPRETER, [path, ...flags]);
process.stdout.on('data', data => { process.stdout.on('data', data => {
@ -143,17 +143,6 @@ class UtapiReindex {
statusCode: code, statusCode: code,
script: path, script: path,
}); });
if (code === EXIT_CODE_SENTINEL_CONNECTION) {
if (remainingSentinels.length > 0) {
this._requestLogger.info('retrying with next sentinel host', {
script: path,
});
return this._runScriptWithSentinels(path, remainingSentinels, done);
}
this._requestLogger.error('no more sentinel host to try', {
script: path,
});
}
} else { } else {
this._requestLogger.info('script exited successfully', { this._requestLogger.info('script exited successfully', {
statusCode: code, statusCode: code,
@ -164,11 +153,6 @@ class UtapiReindex {
}); });
} }
_runScript(path, done) {
const remainingSentinels = [...this._redis.sentinels];
this._runScriptWithSentinels(path, remainingSentinels, done);
}
_attemptLock(job) { _attemptLock(job) {
this._requestLogger.info('attempting to acquire the lock to begin job'); this._requestLogger.info('attempting to acquire the lock to begin job');
this._lock() this._lock()

View File

@ -14,15 +14,6 @@ class UtapiRequest {
this._datastore = null; this._datastore = null;
this._requestQuery = null; this._requestQuery = null;
this._requestPath = null; this._requestPath = null;
this._vault = null;
}
getVault() {
return this._vault;
}
setVault() {
return this._vault;
} }
/** /**

View File

@ -1,21 +1,16 @@
import argparse
import ast
from concurrent.futures import ThreadPoolExecutor
import json
import logging
import re
import redis
import requests import requests
import redis
import json
import ast
import sys import sys
from threading import Thread
import time import time
import urllib import urllib
import re
import sys
from threading import Thread
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO) import argparse
_log = logging.getLogger('utapi-reindex:reporting')
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options(): def get_options():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
@ -34,19 +29,8 @@ class askRedis():
def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None): def __init__(self, ip="127.0.0.1", port="16379", sentinel_cluster_name="scality-s3", password=None):
self._password = password self._password = password
r = redis.Redis( r = redis.Redis(host=ip, port=port, db=0, password=password)
host=ip, self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
port=port,
db=0,
password=password,
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
)
try:
self._ip, self._port = r.sentinel_get_master_addr_by_name(sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {ip}:{port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
def read(self, resource, name): def read(self, resource, name):
r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password) r = redis.Redis(host=self._ip, port=self._port, db=0, password=self._password)
@ -114,4 +98,4 @@ if __name__ == '__main__':
data = U.read('accounts', userid) data = U.read('accounts', userid)
content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % ( content = "Account:%s|NumberOFfiles:%s|StorageCapacity:%s " % (
userid, data["files"], data["total_size"]) userid, data["files"], data["total_size"])
executor.submit(safe_print, content) executor.submit(safe_print, content)

View File

@ -1,6 +1,5 @@
import argparse import argparse
import concurrent.futures as futures import concurrent.futures as futures
import functools
import itertools import itertools
import json import json
import logging import logging
@ -9,7 +8,6 @@ import re
import sys import sys
import time import time
import urllib import urllib
from pathlib import Path
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
@ -25,9 +23,6 @@ MPU_SHADOW_BUCKET_PREFIX = 'mpuShadowBucket'
ACCOUNT_UPDATE_CHUNKSIZE = 100 ACCOUNT_UPDATE_CHUNKSIZE = 100
SENTINEL_CONNECT_TIMEOUT_SECONDS = 10
EXIT_CODE_SENTINEL_CONNECTION_ERROR = 100
def get_options(): def get_options():
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP") parser.add_argument("-i", "--sentinel-ip", default='127.0.0.1', help="Sentinel IP")
@ -36,38 +31,11 @@ def get_options():
parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name") parser.add_argument("-n", "--sentinel-cluster-name", default='scality-s3', help="Redis cluster name")
parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server") parser.add_argument("-s", "--bucketd-addr", default='http://127.0.0.1:9000', help="URL of the bucketd server")
parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers") parser.add_argument("-w", "--worker", default=10, type=int, help="Number of workers")
parser.add_argument("-b", "--bucket", default=None, help="Bucket to be processed")
parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request") parser.add_argument("-r", "--max-retries", default=2, type=int, help="Max retries before failing a bucketd request")
parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy") parser.add_argument("--only-latest-when-locked", action='store_true', help="Only index the latest version of a key when the bucket has a default object lock policy")
parser.add_argument("--debug", action='store_true', help="Enable debug logging") parser.add_argument("--debug", action='store_true', help="Enable debug logging")
parser.add_argument("--dry-run", action="store_true", help="Do not update redis") return parser.parse_args()
group = parser.add_mutually_exclusive_group()
group.add_argument("-a", "--account", default=[], help="account canonical ID (all account buckets will be processed)", action="append", type=nonempty_string('account'))
group.add_argument("--account-file", default=None, help="file containing account canonical IDs, one ID per line", type=existing_file)
group.add_argument("-b", "--bucket", default=[], help="bucket name", action="append", type=nonempty_string('bucket'))
group.add_argument("--bucket-file", default=None, help="file containing bucket names, one bucket name per line", type=existing_file)
options = parser.parse_args()
if options.bucket_file:
with open(options.bucket_file) as f:
options.bucket = [line.strip() for line in f if line.strip()]
elif options.account_file:
with open(options.account_file) as f:
options.account = [line.strip() for line in f if line.strip()]
return options
def nonempty_string(flag):
def inner(value):
if not value.strip():
raise argparse.ArgumentTypeError("%s: value must not be empty"%flag)
return value
return inner
def existing_file(path):
path = Path(path).resolve()
if not path.exists():
raise argparse.ArgumentTypeError("File does not exist: %s"%path)
return path
def chunks(iterable, size): def chunks(iterable, size):
it = iter(iterable) it = iter(iterable)
@ -94,10 +62,6 @@ class InvalidListing(Exception):
def __init__(self, bucket): def __init__(self, bucket):
super().__init__('Invalid contents found while listing bucket %s'%bucket) super().__init__('Invalid contents found while listing bucket %s'%bucket)
class BucketNotFound(Exception):
def __init__(self, bucket):
super().__init__('Bucket %s not found'%bucket)
class BucketDClient: class BucketDClient:
'''Performs Listing calls against bucketd''' '''Performs Listing calls against bucketd'''
@ -177,7 +141,6 @@ class BucketDClient:
else: else:
is_truncated = len(payload) > 0 is_truncated = len(payload) > 0
@functools.lru_cache(maxsize=16)
def _get_bucket_attributes(self, name): def _get_bucket_attributes(self, name):
url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name) url = self.__url_attribute_format.format(addr=self._bucketd_addr, bucket=name)
try: try:
@ -186,7 +149,7 @@ class BucketDClient:
return resp.json() return resp.json()
else: else:
_log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code)) _log.error('Error getting bucket attributes bucket:%s status_code:%s'%(name, resp.status_code))
raise BucketNotFound(name) raise InvalidListing(name)
except ValueError as e: except ValueError as e:
_log.exception(e) _log.exception(e)
_log.error('Invalid attributes response body! bucket:%s'%name) _log.error('Invalid attributes response body! bucket:%s'%name)
@ -199,15 +162,7 @@ class BucketDClient:
_log.error('Unhandled exception getting bucket attributes bucket:%s'%name) _log.error('Unhandled exception getting bucket attributes bucket:%s'%name)
raise raise
def get_bucket_md(self, name): def list_buckets(self, name = None):
md = self._get_bucket_attributes(name)
canonId = md.get('owner')
if canonId is None:
_log.error('No owner found for bucket %s'%name)
raise InvalidListing(name)
return Bucket(canonId, name, md.get('objectLockEnabled', False))
def list_buckets(self, account=None):
def get_next_marker(p): def get_next_marker(p):
if p is None: if p is None:
@ -219,24 +174,25 @@ class BucketDClient:
'maxKeys': 1000, 'maxKeys': 1000,
'marker': get_next_marker 'marker': get_next_marker
} }
if account is not None:
params['prefix'] = '%s..|..' % account
for _, payload in self._list_bucket(USERS_BUCKET, **params): for _, payload in self._list_bucket(USERS_BUCKET, **params):
buckets = [] buckets = []
for result in payload.get('Contents', []): for result in payload.get('Contents', []):
match = re.match("(\w+)..\|..(\w+.*)", result['key']) match = re.match("(\w+)..\|..(\w+.*)", result['key'])
bucket = Bucket(*match.groups(), False) bucket = Bucket(*match.groups(), False)
# We need to get the attributes for each bucket to determine if it is locked if name is None or bucket.name == name:
if self._only_latest_when_locked: # We need to get the attributes for each bucket to determine if it is locked
bucket_attrs = self._get_bucket_attributes(bucket.name) if self._only_latest_when_locked:
object_lock_enabled = bucket_attrs.get('objectLockEnabled', False) bucket_attrs = self._get_bucket_attributes(bucket.name)
bucket = bucket._replace(object_lock_enabled=object_lock_enabled) object_lock_enabled = bucket_attrs.get('objectLockEnabled', False)
buckets.append(bucket) bucket = bucket._replace(object_lock_enabled=object_lock_enabled)
buckets.append(bucket)
if buckets: if buckets:
yield buckets yield buckets
if name is not None:
# Break on the first matching bucket if a name is given
break
def list_mpus(self, bucket): def list_mpus(self, bucket):
_bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name _bucket = MPU_SHADOW_BUCKET_PREFIX + bucket.name
@ -299,7 +255,7 @@ class BucketDClient:
total_size += size total_size += size
except InvalidListing: except InvalidListing:
_log.error('Invalid contents in listing. bucket:%s'%bucket.name) _log.error('Invalid contents in listing. bucket:%s status_code:%s'%(bucket.name, status_code))
raise InvalidListing(bucket.name) raise InvalidListing(bucket.name)
return count, total_size return count, total_size
@ -368,23 +324,6 @@ class BucketDClient:
total_size=total_size total_size=total_size
) )
def list_all_buckets(bucket_client):
return bucket_client.list_buckets()
def list_specific_accounts(bucket_client, accounts):
for account in accounts:
yield from bucket_client.list_buckets(account=account)
def list_specific_buckets(bucket_client, buckets):
batch = []
for bucket in buckets:
try:
batch.append(bucket_client.get_bucket_md(bucket))
except BucketNotFound:
_log.error('Failed to list bucket %s. Removing from results.'%bucket)
continue
yield batch
def index_bucket(client, bucket): def index_bucket(client, bucket):
''' '''
@ -424,16 +363,9 @@ def get_redis_client(options):
host=options.sentinel_ip, host=options.sentinel_ip,
port=options.sentinel_port, port=options.sentinel_port,
db=0, db=0,
password=options.redis_password, password=options.redis_password
socket_connect_timeout=SENTINEL_CONNECT_TIMEOUT_SECONDS
) )
try: ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
ip, port = sentinel.sentinel_get_master_addr_by_name(options.sentinel_cluster_name)
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
_log.error(f'Failed to connect to redis sentinel at {options.sentinel_ip}:{options.sentinel_port}: {e}')
# use a specific error code to hint on retrying with another sentinel node
sys.exit(EXIT_CODE_SENTINEL_CONNECTION_ERROR)
return redis.Redis( return redis.Redis(
host=ip, host=ip,
port=port, port=port,
@ -467,24 +399,18 @@ def log_report(resource, name, obj_count, total_size):
if __name__ == '__main__': if __name__ == '__main__':
options = get_options() options = get_options()
if options.bucket is not None and not options.bucket.strip():
print('You must provide a bucket name with the --bucket flag')
sys.exit(1)
if options.debug: if options.debug:
_log.setLevel(logging.DEBUG) _log.setLevel(logging.DEBUG)
bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked) bucket_client = BucketDClient(options.bucketd_addr, options.max_retries, options.only_latest_when_locked)
redis_client = get_redis_client(options) redis_client = get_redis_client(options)
account_reports = {} account_reports = {}
observed_buckets = set() observed_buckets = set()
failed_accounts = set() failed_accounts = set()
if options.account:
batch_generator = list_specific_accounts(bucket_client, options.account)
elif options.bucket:
batch_generator = list_specific_buckets(bucket_client, options.bucket)
else:
batch_generator = list_all_buckets(bucket_client)
with ThreadPoolExecutor(max_workers=options.worker) as executor: with ThreadPoolExecutor(max_workers=options.worker) as executor:
for batch in batch_generator: for batch in bucket_client.list_buckets(options.bucket):
bucket_reports = {} bucket_reports = {}
jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch } jobs = { executor.submit(index_bucket, bucket_client, b): b for b in batch }
for job in futures.as_completed(jobs.keys()): for job in futures.as_completed(jobs.keys()):
@ -503,84 +429,51 @@ if __name__ == '__main__':
update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size) update_report(account_reports, total.bucket.userid, total.obj_count, total.total_size)
# Bucket reports can be updated as we get them # Bucket reports can be updated as we get them
if options.dry_run: pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items(): for bucket, report in bucket_reports.items():
_log.info( update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
"DryRun: resource buckets [%s] would be updated with obj_count %i and total_size %i" % ( log_report('buckets', bucket, report['obj_count'], report['total_size'])
bucket, report['obj_count'], report['total_size']
)
)
else:
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket, report in bucket_reports.items():
update_redis(pipeline, 'buckets', bucket, report['obj_count'], report['total_size'])
log_report('buckets', bucket, report['obj_count'], report['total_size'])
pipeline.execute()
stale_buckets = set()
recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket:
stale_buckets = { b for b in options.bucket if b not in observed_buckets }
elif options.account:
_log.warning('Stale buckets will not be cleared when using the --account or --account-file flags')
else:
stale_buckets = recorded_buckets.difference(observed_buckets)
_log.info('Found %s stale buckets' % len(stale_buckets))
if options.dry_run:
_log.info("DryRun: not updating stale buckets")
else:
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute() pipeline.execute()
# Account metrics are not updated if a bucket is specified recorded_buckets = set(get_resources_from_redis(redis_client, 'buckets'))
if options.bucket: if options.bucket is None:
_log.warning('Account metrics will not be updated when using the --bucket or --bucket-file flags') stale_buckets = recorded_buckets.difference(observed_buckets)
elif observed_buckets and options.bucket not in recorded_buckets:
# The provided bucket does not exist, so clean up any metrics
stale_buckets = { options.bucket }
else: else:
stale_buckets = set()
_log.info('Found %s stale buckets' % len(stale_buckets))
for chunk in chunks(stale_buckets, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for bucket in chunk:
update_redis(pipeline, 'buckets', bucket, 0, 0)
log_report('buckets', bucket, 0, 0)
pipeline.execute()
# Account metrics are not updated if a bucket is specified
if options.bucket is None:
# Don't update any accounts with failed listings # Don't update any accounts with failed listings
without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items()) without_failed = filter(lambda x: x[0] not in failed_accounts, account_reports.items())
if options.dry_run: # Update total account reports in chunks
for userid, report in account_reports.items(): for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
_log.info( pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
"DryRun: resource account [%s] would be updated with obj_count %i and total_size %i" % ( for userid, report in chunk:
userid, report['obj_count'], report['total_size'] update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
) log_report('accounts', userid, report['obj_count'], report['total_size'])
) pipeline.execute()
else:
# Update total account reports in chunks
for chunk in chunks(without_failed, ACCOUNT_UPDATE_CHUNKSIZE):
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
for userid, report in chunk:
update_redis(pipeline, 'accounts', userid, report['obj_count'], report['total_size'])
log_report('accounts', userid, report['obj_count'], report['total_size'])
pipeline.execute()
if options.account:
for account in options.account:
if account in failed_accounts:
_log.error("No metrics updated for account %s, one or more buckets failed" % account)
# Include failed_accounts in observed_accounts to avoid clearing metrics # Include failed_accounts in observed_accounts to avoid clearing metrics
observed_accounts = failed_accounts.union(set(account_reports.keys())) observed_accounts = failed_accounts.union(set(account_reports.keys()))
recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts')) recorded_accounts = set(get_resources_from_redis(redis_client, 'accounts'))
if options.account: # Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = { a for a in options.account if a not in observed_accounts } stale_accounts = recorded_accounts.difference(observed_accounts)
else:
# Stale accounts and buckets are ones that do not appear in the listing, but have recorded values
stale_accounts = recorded_accounts.difference(observed_accounts)
_log.info('Found %s stale accounts' % len(stale_accounts)) _log.info('Found %s stale accounts' % len(stale_accounts))
if options.dry_run: for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE):
_log.info("DryRun: not updating stale accounts") pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load
else: for account in chunk:
for chunk in chunks(stale_accounts, ACCOUNT_UPDATE_CHUNKSIZE): update_redis(pipeline, 'accounts', account, 0, 0)
pipeline = redis_client.pipeline(transaction=False) # No transaction to reduce redis load log_report('accounts', account, 0, 0)
for account in chunk: pipeline.execute()
update_redis(pipeline, 'accounts', account, 0, 0)
log_report('accounts', account, 0, 0)
pipeline.execute()

View File

@ -68,10 +68,10 @@ const keys = {
*/ */
function getSchemaPrefix(params, timestamp) { function getSchemaPrefix(params, timestamp) {
const { const {
bucket, accountId, userId, level, service, location, bucket, accountId, userId, level, service,
} = params; } = params;
// `service` property must remain last because other objects also include it // `service` property must remain last because other objects also include it
const id = bucket || accountId || userId || location || service; const id = bucket || accountId || userId || service;
const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:` const prefix = timestamp ? `${service}:${level}:${timestamp}:${id}:`
: `${service}:${level}:${id}:`; : `${service}:${level}:${id}:`;
return prefix; return prefix;
@ -86,13 +86,9 @@ function getSchemaPrefix(params, timestamp) {
*/ */
function generateKey(params, metric, timestamp) { function generateKey(params, metric, timestamp) {
const prefix = getSchemaPrefix(params, timestamp); const prefix = getSchemaPrefix(params, timestamp);
if (params.location) {
return `${prefix}locationStorage`;
}
return keys[metric](prefix); return keys[metric](prefix);
} }
/** /**
* Returns a list of the counters for a metric type * Returns a list of the counters for a metric type
* @param {object} params - object with metric type and id as a property * @param {object} params - object with metric type and id as a property

View File

@ -7,6 +7,7 @@ const { Clustering, errors, ipCheck } = require('arsenal');
const arsenalHttps = require('arsenal').https; const arsenalHttps = require('arsenal').https;
const { Logger } = require('werelogs'); const { Logger } = require('werelogs');
const config = require('./Config');
const routes = require('../router/routes'); const routes = require('../router/routes');
const Route = require('../router/Route'); const Route = require('../router/Route');
const Router = require('../router/Router'); const Router = require('../router/Router');
@ -27,12 +28,7 @@ class UtapiServer {
constructor(worker, port, datastore, logger, config) { constructor(worker, port, datastore, logger, config) {
this.worker = worker; this.worker = worker;
this.port = port; this.port = port;
this.vault = config.vaultclient; this.router = new Router(config);
if (!this.vault) {
const Vault = require('./Vault');
this.vault = new Vault(config);
}
this.router = new Router(config, this.vault);
this.logger = logger; this.logger = logger;
this.datastore = datastore; this.datastore = datastore;
this.server = null; this.server = null;
@ -75,7 +71,6 @@ class UtapiServer {
req.socket.setNoDelay(); req.socket.setNoDelay();
const { query, path, pathname } = url.parse(req.url, true); const { query, path, pathname } = url.parse(req.url, true);
const utapiRequest = new UtapiRequest() const utapiRequest = new UtapiRequest()
.setVault(this.vault)
.setRequest(req) .setRequest(req)
.setLog(this.logger.newRequestLogger()) .setLog(this.logger.newRequestLogger())
.setResponse(res) .setResponse(res)
@ -219,7 +214,8 @@ class UtapiServer {
* @property {object} params.log - logger configuration * @property {object} params.log - logger configuration
* @return {undefined} * @return {undefined}
*/ */
function spawn(config) { function spawn(params) {
Object.assign(config, params);
const { const {
workers, redis, log, port, workers, redis, log, port,
} = config; } = config;

View File

@ -23,6 +23,10 @@
"healthChecks": { "healthChecks": {
"allowFrom": ["127.0.0.1/8", "::1"] "allowFrom": ["127.0.0.1/8", "::1"]
}, },
"vaultd": {
"host": "127.0.0.1",
"port": 8500
},
"cacheBackend": "memory", "cacheBackend": "memory",
"development": false, "development": false,
"nodeId": "single_node", "nodeId": "single_node",

View File

@ -2,8 +2,6 @@ const fs = require('fs');
const path = require('path'); const path = require('path');
const Joi = require('@hapi/joi'); const Joi = require('@hapi/joi');
const assert = require('assert'); const assert = require('assert');
const defaults = require('./defaults.json');
const werelogs = require('werelogs');
const { const {
truthy, envNamespace, allowedFilterFields, allowedFilterStates, truthy, envNamespace, allowedFilterFields, allowedFilterStates,
@ -73,6 +71,7 @@ class Config {
constructor(overrides) { constructor(overrides) {
this._basePath = path.join(__dirname, '../../'); this._basePath = path.join(__dirname, '../../');
this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath); this._configPath = _loadFromEnv('CONFIG_FILE', defaultConfigPath);
this._defaultsPath = path.join(__dirname, 'defaults.json');
this.host = undefined; this.host = undefined;
this.port = undefined; this.port = undefined;
@ -90,11 +89,6 @@ class Config {
parsedConfig = this._recursiveUpdate(parsedConfig, overrides); parsedConfig = this._recursiveUpdate(parsedConfig, overrides);
} }
Object.assign(this, parsedConfig); Object.assign(this, parsedConfig);
werelogs.configure({
level: Config.logging.level,
dump: Config.logging.dumpLevel,
});
} }
static _readFile(path, encoding = 'utf-8') { static _readFile(path, encoding = 'utf-8') {
@ -119,7 +113,7 @@ class Config {
} }
_loadDefaults() { _loadDefaults() {
return defaults; return Config._readJSON(this._defaultsPath);
} }
_loadUserConfig() { _loadUserConfig() {

View File

@ -1,6 +1,6 @@
/* eslint-disable no-restricted-syntax */ /* eslint-disable no-restricted-syntax */
const arsenal = require('arsenal'); const arsenal = require('arsenal');
const async = require('async');
const metadata = require('./client'); const metadata = require('./client');
const { LoggerContext, logger } = require('../utils'); const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants'); const { keyVersionSplitter } = require('../constants');
@ -12,14 +12,9 @@ const moduleLogger = new LoggerContext({
module: 'metadata.client', module: 'metadata.client',
}); });
const ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
const PAGE_SIZE = 1000; const PAGE_SIZE = 1000;
async function _listingWrapper(bucket, params) { function _listingWrapper(bucket, params) {
return new Promise( return new Promise(
(resolve, reject) => metadata.listObject( (resolve, reject) => metadata.listObject(
bucket, bucket,
@ -46,7 +41,7 @@ function _listObject(bucket, prefix, hydrateFunc) {
try { try {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
res = await async.retryable(ebConfig, _listingWrapper)(bucket, { ...listingParams, gt }); res = await _listingWrapper(bucket, { ...listingParams, gt });
} catch (error) { } catch (error) {
moduleLogger.error('Error during listing', { error }); moduleLogger.error('Error during listing', { error });
throw error; throw error;

View File

@ -6,8 +6,7 @@ const BackOff = require('backo');
const { whilst } = require('async'); const { whilst } = require('async');
const errors = require('./errors'); const errors = require('./errors');
const { LoggerContext } = require('./utils/log'); const { LoggerContext, asyncOrCallback } = require('./utils');
const { asyncOrCallback } = require('./utils/func');
const moduleLogger = new LoggerContext({ const moduleLogger = new LoggerContext({
module: 'redis', module: 'redis',

View File

@ -31,12 +31,6 @@ class ReindexTask extends BaseTask {
this._defaultLag = 0; this._defaultLag = 0;
const eventFilters = (config && config.filter) || {}; const eventFilters = (config && config.filter) || {};
this._shouldReindex = buildFilterChain((config && config.filter) || {}); this._shouldReindex = buildFilterChain((config && config.filter) || {});
// exponential backoff: max wait = 50 * 2 ^ 10 milliseconds ~= 51 seconds
this.ebConfig = {
times: 10,
interval: retryCount => 50 * (2 ** retryCount),
};
if (Object.keys(eventFilters).length !== 0) { if (Object.keys(eventFilters).length !== 0) {
logEventFilter((...args) => logger.info(...args), 'reindex resource filtering enabled', eventFilters); logEventFilter((...args) => logger.info(...args), 'reindex resource filtering enabled', eventFilters);
} }
@ -164,6 +158,7 @@ class ReindexTask extends BaseTask {
if (this._program.bucket.length) { if (this._program.bucket.length) {
return this._program.bucket.map(name => ({ name })); return this._program.bucket.map(name => ({ name }));
} }
return metadata.listBuckets(); return metadata.listBuckets();
} }
@ -185,8 +180,8 @@ class ReindexTask extends BaseTask {
let mpuTotal; let mpuTotal;
try { try {
bktTotal = await async.retryable(this.ebConfig, ReindexTask._indexBucket)(bucket.name); bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(this.ebConfig, ReindexTask._indexMpuBucket)(mpuBucket); mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) { } catch (error) {
logger.error( logger.error(
'failed bucket reindex. any associated account will be skipped', 'failed bucket reindex. any associated account will be skipped',

View File

@ -1,6 +1,14 @@
const werelogs = require('werelogs'); const werelogs = require('werelogs');
const config = require('../config');
const { comprehend } = require('./func'); const { comprehend } = require('./func');
const loggerConfig = {
level: config.logging.level,
dump: config.logging.dumpLevel,
};
werelogs.configure(loggerConfig);
const rootLogger = new werelogs.Logger('Utapi'); const rootLogger = new werelogs.Logger('Utapi');
class LoggerContext { class LoggerContext {
@ -70,6 +78,8 @@ class LoggerContext {
} }
} }
rootLogger.debug('logger initialized', { loggerConfig });
function buildRequestLogger(req) { function buildRequestLogger(req) {
let reqUids = []; let reqUids = [];
if (req.headers['x-scal-request-uids'] !== undefined) { if (req.headers['x-scal-request-uids'] !== undefined) {

View File

@ -1,5 +1,6 @@
const assert = require('assert'); const assert = require('assert');
const { auth, policies } = require('arsenal'); const { auth, policies } = require('arsenal');
const vaultclient = require('vaultclient');
const config = require('../config'); const config = require('../config');
const errors = require('../errors'); const errors = require('../errors');
/** /**
@ -8,17 +9,9 @@ const errors = require('../errors');
*/ */
class VaultWrapper extends auth.Vault { class VaultWrapper extends auth.Vault {
create(config) {
if (config.vaultd.host) {
return new VaultWrapper(config);
}
return null;
}
constructor(options) { constructor(options) {
let client; let client;
const { host, port } = options.vaultd; const { host, port } = options.vaultd;
const vaultclient = require('vaultclient');
if (options.tls) { if (options.tls) {
const { key, cert, ca } = options.tls; const { key, cert, ca } = options.tls;
client = new vaultclient.Client(host, port, true, key, cert, client = new vaultclient.Client(host, port, true, key, cert,
@ -126,7 +119,7 @@ class VaultWrapper extends auth.Vault {
} }
} }
const vault = VaultWrapper.create(config); const vault = new VaultWrapper(config);
auth.setHandler(vault); auth.setHandler(vault);
module.exports = { module.exports = {

View File

@ -3,7 +3,7 @@
"engines": { "engines": {
"node": ">=16" "node": ">=16"
}, },
"version": "8.1.15", "version": "7.70.4",
"description": "API for tracking resource utilization and reporting metrics", "description": "API for tracking resource utilization and reporting metrics",
"main": "index.js", "main": "index.js",
"repository": { "repository": {
@ -19,12 +19,13 @@
"dependencies": { "dependencies": {
"@hapi/joi": "^17.1.1", "@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.14", "@senx/warp10": "^1.0.14",
"arsenal": "git+https://git.yourcmc.ru/vitalif/zenko-arsenal.git#development/8.1", "arsenal": "git+https://github.com/scality/Arsenal#7.10.46",
"async": "^3.2.0", "async": "^3.2.0",
"aws-sdk": "^2.1005.0", "aws-sdk": "^2.1005.0",
"aws4": "^1.8.0", "aws4": "^1.8.0",
"backo": "^1.1.0", "backo": "^1.1.0",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient#7.10.6",
"byte-size": "^7.0.0", "byte-size": "^7.0.0",
"commander": "^5.1.0", "commander": "^5.1.0",
"cron-parser": "^2.15.0", "cron-parser": "^2.15.0",
@ -39,14 +40,15 @@
"oas-tools": "^2.2.2", "oas-tools": "^2.2.2",
"prom-client": "14.2.0", "prom-client": "14.2.0",
"uuid": "^3.3.2", "uuid": "^3.3.2",
"werelogs": "git+https://git.yourcmc.ru/vitalif/zenko-werelogs.git#development/8.1" "vaultclient": "scality/vaultclient#7.10.13",
"werelogs": "scality/werelogs#8.1.0"
}, },
"devDependencies": { "devDependencies": {
"eslint": "^8.14.0", "eslint": "6.0.1",
"eslint-config-airbnb-base": "^15.0.0", "eslint-config-airbnb": "17.1.0",
"eslint-config-scality": "git+https://git.yourcmc.ru/vitalif/zenko-eslint-config-scality.git", "eslint-config-scality": "scality/Guidelines#7.10.2",
"eslint-plugin-import": "^2.18.0", "eslint-plugin-import": "^2.18.0",
"mocha": ">=3.1.2", "mocha": "^3.0.2",
"nodemon": "^2.0.4", "nodemon": "^2.0.4",
"protobufjs": "^6.10.1", "protobufjs": "^6.10.1",
"sinon": "^9.0.2" "sinon": "^9.0.2"

View File

@ -3,16 +3,17 @@ const assert = require('assert');
const url = require('url'); const url = require('url');
const { auth, errors, policies } = require('arsenal'); const { auth, errors, policies } = require('arsenal');
const safeJsonParse = require('../utils/safeJsonParse'); const safeJsonParse = require('../utils/safeJsonParse');
const Vault = require('../lib/Vault');
class Router { class Router {
/** /**
* @constructor * @constructor
* @param {Config} config - Config instance * @param {Config} config - Config instance
*/ */
constructor(config, vault) { constructor(config) {
this._service = config.component; this._service = config.component;
this._routes = {}; this._routes = {};
this._vault = vault; this._vault = new Vault(config);
} }
/** /**
@ -265,10 +266,6 @@ class Router {
*/ */
_processSecurityChecks(utapiRequest, route, cb) { _processSecurityChecks(utapiRequest, route, cb) {
const log = utapiRequest.getLog(); const log = utapiRequest.getLog();
if (process.env.UTAPI_AUTH === 'false') {
// Zenko route request does not need to go through Vault
return this._startRequest(utapiRequest, route, cb);
}
return this._authSquared(utapiRequest, err => { return this._authSquared(utapiRequest, err => {
if (err) { if (err) {
log.trace('error from vault', { errors: err }); log.trace('error from vault', { errors: err });

View File

@ -1,21 +1,4 @@
const fs = require('fs'); const config = require('./lib/Config');
const path = require('path');
const Config = require('./lib/Config');
const server = require('./lib/server'); const server = require('./lib/server');
/* server(Object.assign({}, config, { component: 's3' }));
* By default, the config file is "config.json" at the root.
* It can be overridden using the UTAPI_CONFIG_FILE environment var.
*/
const cfgpath = process.env.UTAPI_CONFIG_FILE || (__dirname+'/config.json');
let cfg;
try {
cfg = JSON.parse(fs.readFileSync(cfgpath, { encoding: 'utf-8' }));
} catch (err) {
throw new Error(`could not parse config file: ${err.message}`);
}
cfg.component = 's3';
server(new Config(cfg));

View File

@ -170,6 +170,7 @@ describe('Test IngestShards', function () {
const results = await warp10.fetch({ const results = await warp10.fetch({
className: 'utapi.event', labels: { node: prefix }, start: start + 1, stop: -2, className: 'utapi.event', labels: { node: prefix }, start: start + 1, stop: -2,
}); });
const series = JSON.parse(results.result[0])[0]; const series = JSON.parse(results.result[0])[0];
const timestamps = series.v.map(ev => ev[0]); const timestamps = series.v.map(ev => ev[0]);
assert.deepStrictEqual([ assert.deepStrictEqual([
@ -178,8 +179,7 @@ describe('Test IngestShards', function () {
], timestamps); ], timestamps);
}); });
// please unskip this in https://scality.atlassian.net/browse/UTAPI-65 it('should increment microseconds for several duplicate timestamps', async () => {
it.skip('should increment microseconds for several duplicate timestamps', async () => {
const start = shardFromTimestamp(getTs(-120)); const start = shardFromTimestamp(getTs(-120));
const events = generateFakeEvents(start, start + 5, 5) const events = generateFakeEvents(start, start + 5, 5)
.map(ev => { ev.timestamp = start; return ev; }); .map(ev => { ev.timestamp = start; return ev; });
@ -190,6 +190,7 @@ describe('Test IngestShards', function () {
const results = await warp10.fetch({ const results = await warp10.fetch({
className: 'utapi.event', labels: { node: prefix }, start: start + 5, stop: -5, className: 'utapi.event', labels: { node: prefix }, start: start + 5, stop: -5,
}); });
const series = JSON.parse(results.result[0])[0]; const series = JSON.parse(results.result[0])[0];
const timestamps = series.v.map(ev => ev[0]); const timestamps = series.v.map(ev => ev[0]);
assert.deepStrictEqual([ assert.deepStrictEqual([

View File

@ -177,49 +177,4 @@ describe('Test ReindexTask', function () {
assert.strictEqual(series[0].values.length, 2); assert.strictEqual(series[0].values.length, 2);
series[0].values.map(value => assert.deepStrictEqual(value, bucketRecord)); series[0].values.map(value => assert.deepStrictEqual(value, bucketRecord));
}); });
describe('exponential backoff', () => {
it('should retry when bucketd is unreachable', done => {
// disable bucketd to simulate downtime
bucketd.end();
const bucketDStub = sinon.stub(bucketd, '_getBucketResponse');
bucketDStub.onFirstCall().callsFake(
// Once the timeout promise resolves, bucketd is able to be called.
// If we make a call after 10 seconds, this shows that retries
// have been occuring during bucketd downtime.
() => {
return {
key: 'foo',
value: 'bar',
};
},
);
const reindexPromise = new Promise((resolve, reject) => {
reindexTask._execute()
.then(() => {
resolve('reindexed');
})
.catch(err => {
reject(err);
});
});
const timeoutPromise = new Promise(resolve => {
const f = () => {
bucketd.start();
resolve();
};
setTimeout(f, 10000);
});
Promise.all([reindexPromise, timeoutPromise])
.then(values => {
assert.strictEqual(values[0], 'reindexed');
sinon.restore();
done();
});
});
});
}); });

View File

@ -233,8 +233,7 @@ describe('Test UtapiClient', function () {
}); });
}); });
// please unskip this in https://scality.atlassian.net/browse/UTAPI-65 it('should get the current storage for an account using the cache', async () => {
it.skip('should get the current storage for an account using the cache', async () => {
await async.eachOf(totals.accounts, async (total, acc) => { await async.eachOf(totals.accounts, async (total, acc) => {
cacheClient.updateAccountCounterBase(acc, total.bytes); cacheClient.updateAccountCounterBase(acc, total.bytes);
}); });

View File

@ -21,9 +21,6 @@ const config = {
localCache: redisLocal, localCache: redisLocal,
component: 's3', component: 's3',
}; };
const location = 'foo-backend';
const incrby = 100;
const decrby = -30;
function isSortedSetKey(key) { function isSortedSetKey(key) {
return key.endsWith('storageUtilized') || key.endsWith('numberOfObjects'); return key.endsWith('storageUtilized') || key.endsWith('numberOfObjects');
@ -79,29 +76,6 @@ function setMockData(data, timestamp, cb) {
return cb(); return cb();
} }
function getLocationObject(bytesValue) {
const obj = {};
obj[`s3:location:${location}:locationStorage`] = `${bytesValue}`;
return obj;
}
function testLocationMetric(c, params, expected, cb) {
const { location, updateSize } = params;
if (updateSize) {
c.pushLocationMetric(location, updateSize, REQUID, err => {
assert.equal(err, null);
assert.deepStrictEqual(memoryBackend.data, expected);
return cb();
});
} else {
c.getLocationMetric(location, REQUID, (err, bytesStored) => {
assert.equal(err, null);
assert.strictEqual(bytesStored, expected);
return cb();
});
}
}
describe('UtapiClient:: enable/disable client', () => { describe('UtapiClient:: enable/disable client', () => {
it('should disable client when no redis config is provided', () => { it('should disable client when no redis config is provided', () => {
const c = new UtapiClient(); const c = new UtapiClient();
@ -832,26 +806,3 @@ tests.forEach(test => {
}); });
}); });
describe('UtapiClient:: location quota metrics', () => {
beforeEach(function beFn() {
this.currentTest.c = new UtapiClient(config);
this.currentTest.c.setDataStore(ds);
});
afterEach(() => memoryBackend.flushDb());
it('should increment location metric', function itFn(done) {
const expected = getLocationObject(incrby);
testLocationMetric(this.test.c, { location, updateSize: incrby },
expected, done);
});
it('should decrement location metric', function itFn(done) {
const expected = getLocationObject(decrby);
testLocationMetric(this.test.c, { location, updateSize: decrby },
expected, done);
});
it('should list location metric', function itFn(done) {
const expected = 0;
testLocationMetric(this.test.c, { location }, expected, done);
});
});

5283
yarn.lock Normal file

File diff suppressed because it is too large Load Diff