Compare commits
12 Commits
17225ad8c7
...
84cb2113c2
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 84cb2113c2 | |
Taylor McKinnon | 812d5a5897 | |
Taylor McKinnon | 53ba468549 | |
Taylor McKinnon | 9c58dcabea | |
Taylor McKinnon | a5c52de18d | |
Taylor McKinnon | 659fb9918c | |
Taylor McKinnon | 12afb27d75 | |
Taylor McKinnon | 309f380043 | |
Taylor McKinnon | f3f12861c1 | |
Taylor McKinnon | dd3067cf3e | |
Taylor McKinnon | 7695e7651b | |
Taylor McKinnon | 9a3cb48ad1 |
10
.eslintrc
10
.eslintrc
|
@ -3,5 +3,13 @@
|
|||
"rules": {
|
||||
"no-underscore-dangle": "off",
|
||||
"implicit-arrow-linebreak" : "off"
|
||||
}
|
||||
},
|
||||
"settings": {
|
||||
"import/resolver": {
|
||||
"node": {
|
||||
"paths": ["/backbeat/node_modules", "node_modules"]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'Reindex',
|
||||
});
|
||||
|
||||
|
||||
const task = new tasks.ReindexTask();
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting Reindex daemon'))
|
||||
.then(() => task.start())
|
||||
.then(() => logger.info('Reindex started'));
|
24
eve/main.yml
24
eve/main.yml
|
@ -16,14 +16,17 @@ models:
|
|||
type: kube_pod
|
||||
path: eve/workers/pod.yml
|
||||
images:
|
||||
aggressor: eve/workers/unit_and_feature_tests
|
||||
aggressor:
|
||||
context: '.'
|
||||
dockerfile: eve/workers/unit_and_feature_tests
|
||||
warp10:
|
||||
context: '.'
|
||||
dockerfile: 'images/warp10/Dockerfile'
|
||||
vault: eve/workers/mocks/vault
|
||||
- Install: &install
|
||||
name: install node modules
|
||||
command: yarn install --frozen-lockfile
|
||||
# command: yarn install --frozen-lockfile
|
||||
command: echo '@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ Skipping Install @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@'
|
||||
haltOnFailure: True
|
||||
- Upload: &upload_artifacts
|
||||
source: /artifacts
|
||||
|
@ -76,6 +79,10 @@ stages:
|
|||
- ShellCommand:
|
||||
name: run client tests
|
||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
|
||||
logfiles:
|
||||
utapi:
|
||||
filename: "/artifacts/setup_ft_test:client.log"
|
||||
follow: true
|
||||
run-server-tests:
|
||||
worker: *workspace
|
||||
steps:
|
||||
|
@ -84,6 +91,10 @@ stages:
|
|||
- ShellCommand:
|
||||
name: run server tests
|
||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
|
||||
logfiles:
|
||||
utapi:
|
||||
filename: "/artifacts/setup_ft_test:server.log"
|
||||
follow: true
|
||||
run-cron-tests:
|
||||
worker: *workspace
|
||||
steps:
|
||||
|
@ -92,6 +103,10 @@ stages:
|
|||
- ShellCommand:
|
||||
name: run cron tests
|
||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
|
||||
logfiles:
|
||||
utapi:
|
||||
filename: "/artifacts/setup_ft_test:cron.log"
|
||||
follow: true
|
||||
run-interval-tests:
|
||||
worker: *workspace
|
||||
steps:
|
||||
|
@ -100,6 +115,10 @@ stages:
|
|||
- ShellCommand:
|
||||
name: run interval tests
|
||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
|
||||
logfiles:
|
||||
utapi:
|
||||
filename: "/artifacts/setup_ft_test:interval.log"
|
||||
follow: true
|
||||
run-v2-functional-tests:
|
||||
worker:
|
||||
<< : *workspace
|
||||
|
@ -123,6 +142,7 @@ stages:
|
|||
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
|
||||
env:
|
||||
UTAPI_CACHE_BACKEND: redis
|
||||
UTAPI_LOG_LEVEL: trace
|
||||
logfiles:
|
||||
warp10:
|
||||
filename: "/artifacts/warp10.log"
|
||||
|
|
|
@ -3,10 +3,16 @@ FROM buildpack-deps:jessie-curl
|
|||
#
|
||||
# Install apt packages needed by utapi and buildbot_worker
|
||||
#
|
||||
|
||||
ENV LANG C.UTF-8
|
||||
ENV NODE_VERSION 10.22.0
|
||||
ENV PATH=$PATH:/utapi/node_modules/.bin
|
||||
ENV NODE_PATH=/utapi/node_modules
|
||||
|
||||
COPY utapi_packages.list buildbot_worker_packages.list /tmp/
|
||||
|
||||
WORKDIR /utapi
|
||||
|
||||
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
|
||||
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
|
||||
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
|
||||
|
@ -19,7 +25,17 @@ RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x6
|
|||
&& rm -f /etc/supervisor/conf.d/*.conf \
|
||||
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
|
||||
|
||||
COPY package.json yarn.lock /utapi/
|
||||
|
||||
#
|
||||
# Install yarn dependencies
|
||||
#
|
||||
|
||||
COPY package.json yarn.lock /utapi/
|
||||
|
||||
RUN yarn cache clean \
|
||||
&& yarn install --frozen-lockfile \
|
||||
&& yarn cache clean
|
||||
#
|
||||
# Run buildbot-worker on startup through supervisor
|
||||
#
|
||||
|
|
|
@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
|
|||
SETUP_CMD="start"
|
||||
fi
|
||||
|
||||
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD | tee -a "/artifacts/setup_$2.log" &
|
||||
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "/artifacts/setup_$2.log" &
|
||||
bash tests/utils/wait_for_local_port.bash $PORT 40
|
||||
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"
|
||||
|
|
|
@ -490,7 +490,7 @@ class UtapiClient {
|
|||
return done();
|
||||
}),
|
||||
// if cursor is 0, it reached end of scan
|
||||
() => cursor === '0',
|
||||
cb => cb(null, cursor === '0'),
|
||||
err => callback(err, keys),
|
||||
);
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ class MemoryCache {
|
|||
|
||||
async addToShard(shard, event) {
|
||||
const metricKey = schema.getUtapiMetricKey(this._prefix, event);
|
||||
console.log(`MEM: ${metricKey}`)
|
||||
this._data[metricKey] = event;
|
||||
if (this._shards[shard]) {
|
||||
this._shards[shard].push(metricKey);
|
||||
|
|
|
@ -64,6 +64,7 @@ class RedisCache {
|
|||
return logger
|
||||
.logAsyncError(async () => {
|
||||
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
|
||||
console.log(metricKey)
|
||||
const shardKey = schema.getShardKey(this._prefix, shard);
|
||||
const shardMasterKey = schema.getShardMasterKey(this._prefix);
|
||||
logger.debug('adding metric to shard', { metricKey, shardKey });
|
||||
|
|
|
@ -2,6 +2,7 @@ const { callbackify } = require('util');
|
|||
const { Transform } = require('stream');
|
||||
const uuid = require('uuid');
|
||||
const needle = require('needle');
|
||||
const aws4 = require('aws4');
|
||||
|
||||
// These modules are added via the `level-mem` package rather than individually
|
||||
/* eslint-disable import/no-extraneous-dependencies */
|
||||
|
@ -9,7 +10,7 @@ const levelup = require('levelup');
|
|||
const memdown = require('memdown');
|
||||
const encode = require('encoding-down');
|
||||
const { UtapiMetric } = require('../models');
|
||||
const { LoggerContext } = require('../utils');
|
||||
const { LoggerContext, asyncOrCallback } = require('../utils');
|
||||
/* eslint-enable import/no-extraneous-dependencies */
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
|
@ -65,6 +66,7 @@ class Uploader extends Transform {
|
|||
});
|
||||
}
|
||||
}
|
||||
// const credentials = { accessKeyId, secretAccessKey, token };
|
||||
|
||||
class UtapiClient {
|
||||
constructor(config) {
|
||||
|
@ -77,6 +79,10 @@ class UtapiClient {
|
|||
this._drainTimer = null;
|
||||
this._drainCanSchedule = true;
|
||||
this._drainDelay = (config && config.drainDelay) || 30000;
|
||||
this._credentials = {
|
||||
accessKeyId: (config && config.accessKeyId) || 'accessKey1',
|
||||
secretAccessKey: (config && config.secretAccessKey) || 'verySecretKey1',
|
||||
};
|
||||
}
|
||||
|
||||
async join() {
|
||||
|
@ -259,6 +265,58 @@ class UtapiClient {
|
|||
}
|
||||
return this._pushMetric(data);
|
||||
}
|
||||
|
||||
async _signedRequest(method, url, options = {}) {
|
||||
const _url = new URL(url);
|
||||
const reqOptions = {
|
||||
method,
|
||||
hostname: _url.host,
|
||||
service: 's3',
|
||||
path: `${_url.pathname}${_url.search}`,
|
||||
signQuery: false,
|
||||
body: options.body,
|
||||
};
|
||||
|
||||
const signedOptions = aws4.sign(reqOptions, this._credentials);
|
||||
|
||||
const args = [method, url];
|
||||
if (options.body !== undefined) {
|
||||
args.push(options.body);
|
||||
}
|
||||
|
||||
args.push({
|
||||
...options,
|
||||
headers: signedOptions.headers,
|
||||
});
|
||||
|
||||
return needle(...args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the storageUtilized of a resource
|
||||
*
|
||||
* @param {string} level - level of metrics, curretly only 'accounts' is supported
|
||||
* @param {string} resource - id of the resource
|
||||
* @param {Function|undefined} callback - optional callback
|
||||
* @returns {Promise|undefined} - return a Promise if no callback is provided, undefined otherwise
|
||||
*/
|
||||
getStorage(level, resource, callback) {
|
||||
if (level !== 'accounts') {
|
||||
throw new Error('invalid level, only "accounts" is supported');
|
||||
}
|
||||
return asyncOrCallback(async () => {
|
||||
const resp = await this._signedRequest(
|
||||
'get',
|
||||
`http://${this._host}:${this._port}/v2/storage/${level}/${resource}`,
|
||||
);
|
||||
|
||||
if (resp.statusCode !== 200) {
|
||||
throw new Error(`unable to retrieve metrics: ${resp.statusMessage}`);
|
||||
}
|
||||
|
||||
return resp.body;
|
||||
}, callback);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = UtapiClient;
|
||||
|
|
|
@ -30,5 +30,10 @@
|
|||
"ingestionSchedule": "*/5 * * * * *",
|
||||
"checkpointSchedule": "*/30 * * * * *",
|
||||
"snapshotSchedule": "* 0 * * * *",
|
||||
"repairSchedule": "* */5 * * * *"
|
||||
"repairSchedule": "* */5 * * * *",
|
||||
"bucketd": [ "localhost:9000" ],
|
||||
"reindex": {
|
||||
"enabled": true,
|
||||
"schedule": "0 0 0 * * 6"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -285,6 +285,8 @@ class Config {
|
|||
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
|
||||
};
|
||||
|
||||
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
|
||||
|
||||
return parsedConfig;
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,6 @@ const warp10MultiHost = Joi.object({
|
|||
writeToken: Joi.string(),
|
||||
});
|
||||
|
||||
Joi.array().items(warp10SingleHost);
|
||||
|
||||
const tlsSchema = Joi.object({
|
||||
key: Joi.string(),
|
||||
|
@ -62,6 +61,11 @@ const schema = Joi.object({
|
|||
host: Joi.string().hostname(),
|
||||
port: Joi.number().port(),
|
||||
}),
|
||||
reindex: Joi.object({
|
||||
enabled: Joi.boolean(),
|
||||
schedule: Joi.string(),
|
||||
}),
|
||||
bucketd: Joi.array().items(Joi.string()),
|
||||
expireMetrics: Joi.boolean(),
|
||||
expireMetricsTTL: Joi.number(),
|
||||
cacheBackend: Joi.string().valid('memory', 'redis'),
|
||||
|
|
|
@ -84,13 +84,15 @@ const constants = {
|
|||
buckets: 'bck',
|
||||
},
|
||||
|
||||
warp10ValueType: ':m:utapi/event:',
|
||||
warp10EventType: ':m:utapi/event:',
|
||||
warp10RecordType: ':m:utapi/record:',
|
||||
truthy,
|
||||
shardIngestLagSecs: 30,
|
||||
checkpointLagSecs: 300,
|
||||
snapshotLagSecs: 900,
|
||||
repairLagSecs: 5,
|
||||
counterBaseValueExpiration: 86400, // 24hrs
|
||||
keyVersionSplitter: String.fromCharCode(0),
|
||||
};
|
||||
|
||||
constants.operationToResponse = constants.operations
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
|
||||
const bucketclient = require('bucketclient');
|
||||
|
||||
const config = require('../config');
|
||||
const { LoggerContext } = require('../utils');
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
module: 'metadata.client',
|
||||
});
|
||||
|
||||
const params = {
|
||||
bucketdBootstrap: config.bucketd,
|
||||
https: config.https,
|
||||
};
|
||||
|
||||
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);
|
|
@ -0,0 +1,116 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = require('arsenal/lib/constants');
|
||||
const metadata = require('./client');
|
||||
const { LoggerContext, logger } = require('../utils');
|
||||
const { keyVersionSplitter } = require('../constants');
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
module: 'metadata.client',
|
||||
});
|
||||
|
||||
const PAGE_SIZE = 1000;
|
||||
|
||||
function _listingWrapper(bucket, params) {
|
||||
return new Promise(
|
||||
(resolve, reject) => metadata.listObject(
|
||||
bucket,
|
||||
params,
|
||||
logger.newRequestLogger(),
|
||||
(err, res) => {
|
||||
if (err) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
resolve(res);
|
||||
},
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
function _listObject(bucket, prefix, hydrateFunc) {
|
||||
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
|
||||
let gt;
|
||||
return {
|
||||
async* [Symbol.asyncIterator]() {
|
||||
while (true) {
|
||||
let res;
|
||||
|
||||
try {
|
||||
// eslint-disable-next-line no-await-in-loop
|
||||
res = await _listingWrapper(bucket, { ...listingParams, gt });
|
||||
} catch (error) {
|
||||
moduleLogger.error('Error during listing', { error });
|
||||
throw error;
|
||||
}
|
||||
|
||||
for (const item of res) {
|
||||
yield hydrateFunc ? hydrateFunc(item) : item;
|
||||
}
|
||||
|
||||
if (res.length === 0 || res.length !== PAGE_SIZE) {
|
||||
break;
|
||||
}
|
||||
|
||||
gt = res[res.length - 1].key;
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function listObjects(bucket) {
|
||||
return _listObject(bucket, '', data => {
|
||||
const { key, value } = data;
|
||||
const [name, version] = key.split(keyVersionSplitter);
|
||||
return {
|
||||
name,
|
||||
version,
|
||||
value: JSON.parse(value),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
function listBuckets() {
|
||||
return _listObject(usersBucket, '', data => {
|
||||
const { key, value } = data;
|
||||
const [account, name] = key.split(mdKeySplitter);
|
||||
return {
|
||||
account,
|
||||
name,
|
||||
value: JSON.parse(value),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
async function listMPUs(bucket) {
|
||||
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
|
||||
return _listObject(mpuBucket, '', data => {
|
||||
const { key, value } = data;
|
||||
const [account, name] = key.split(mdKeySplitter);
|
||||
return {
|
||||
account,
|
||||
name,
|
||||
value: JSON.parse(value),
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
function bucketExists(bucket) {
|
||||
return new Promise((resolve, reject) => metadata.getBucketAttributes(
|
||||
bucket,
|
||||
logger.newRequestLogger(),
|
||||
err => {
|
||||
if (err && !err.NoSuchBucket) {
|
||||
reject(err);
|
||||
return;
|
||||
}
|
||||
resolve(err === null);
|
||||
},
|
||||
));
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
listBuckets,
|
||||
listObjects,
|
||||
listMPUs,
|
||||
bucketExists,
|
||||
};
|
|
@ -34,7 +34,7 @@ class RequestContext extends RequestContextModel {
|
|||
const tag = request.swagger.operation['x-router-controller'];
|
||||
const { operationId } = request.swagger.operation;
|
||||
|
||||
request.logger.addDefaultFields({
|
||||
request.logger.logger.addDefaultFields({
|
||||
tag,
|
||||
operationId,
|
||||
service: 'utapi',
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
const Joi = require('@hapi/joi');
|
||||
const { buildModel } = require('./Base');
|
||||
|
||||
const recordSchema = {
|
||||
timestamp: Joi.number(),
|
||||
objectDelta: Joi.number(),
|
||||
sizeDelta: Joi.number(),
|
||||
incomingBytes: Joi.number(),
|
||||
outgoingBytes: Joi.number(),
|
||||
};
|
||||
|
||||
module.exports = buildModel('UtapiRecord', recordSchema);
|
|
@ -1,5 +1,6 @@
|
|||
const BaseModel = require('./Base');
|
||||
const UtapiMetric = require('./UtapiMetric');
|
||||
const UtapiRecord = require('./UtapiRecord');
|
||||
const RequestContext = require('./RequestContext');
|
||||
const ResponseContainer = require('./ResponseContainer');
|
||||
|
||||
|
@ -8,4 +9,5 @@ module.exports = {
|
|||
UtapiMetric,
|
||||
RequestContext,
|
||||
ResponseContainer,
|
||||
UtapiRecord,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
const errors = require('../../../errors');
|
||||
const { serviceToWarp10Label } = require('../../../constants');
|
||||
const { client: warp10 } = require('../../../warp10');
|
||||
const { client: cache } = require('../../../cache');
|
||||
const { now } = require('../../../utils');
|
||||
const config = require('../../../config');
|
||||
|
||||
|
||||
async function getStorage(ctx, params) {
|
||||
const { level, resource } = params;
|
||||
|
||||
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
|
||||
|
||||
let storageUtilized;
|
||||
|
||||
if (base !== null) {
|
||||
storageUtilized = counter + base;
|
||||
} else {
|
||||
const labelName = serviceToWarp10Label[params.level];
|
||||
const labels = { [labelName]: resource };
|
||||
const options = {
|
||||
params: {
|
||||
end: now(),
|
||||
labels,
|
||||
node: config.nodeId,
|
||||
},
|
||||
macro: 'utapi/getMetricsAt',
|
||||
};
|
||||
const res = await warp10.exec(options);
|
||||
|
||||
if (res.result.length === 0) {
|
||||
ctx.logger.error('unable to retrieve metrics', { level, resource });
|
||||
throw errors.InternalError;
|
||||
}
|
||||
|
||||
const { sizeD: currentSize } = res.result[0];
|
||||
await cache.updateAccountCounterBase(resource, currentSize);
|
||||
storageUtilized = currentSize;
|
||||
}
|
||||
|
||||
ctx.results.statusCode = 200;
|
||||
ctx.results.body = {
|
||||
storageUtilized,
|
||||
resource,
|
||||
level,
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = getStorage;
|
|
@ -2,6 +2,7 @@ const errors = require('../../../errors');
|
|||
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
|
||||
const { convertTimestamp } = require('../../../utils');
|
||||
const { client: warp10 } = require('../../../warp10');
|
||||
const config = require('../../../config');
|
||||
|
||||
const emptyOperationsResponse = Object.values(operationToResponse)
|
||||
.reduce((prev, key) => {
|
||||
|
@ -32,6 +33,7 @@ async function listMetric(ctx, params) {
|
|||
start,
|
||||
end,
|
||||
labels,
|
||||
node: config.nodeId,
|
||||
},
|
||||
macro: 'utapi/getMetrics',
|
||||
};
|
||||
|
|
|
@ -126,19 +126,20 @@ class APIController {
|
|||
static async _callOperation(handler, request, response, params) {
|
||||
try {
|
||||
await handler(request.ctx, params);
|
||||
} catch (err) {
|
||||
request.logger.error('error during operation', { err });
|
||||
throw err;
|
||||
} catch (error) {
|
||||
console.log(error)
|
||||
request.logger.error('error during operation', { error });
|
||||
throw error;
|
||||
}
|
||||
request.logger.debug('writing operation result');
|
||||
try {
|
||||
await APIController._writeResult(request.ctx.results, response);
|
||||
} catch (err) {
|
||||
} catch (error) {
|
||||
request.logger.error(
|
||||
'error while writing operation result',
|
||||
{ err },
|
||||
{ error },
|
||||
);
|
||||
throw err;
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,15 +85,35 @@ async function initializeOasTools(spec, app) {
|
|||
async function authV4Middleware(request, response, params) {
|
||||
const authHeader = request.headers.authorization;
|
||||
if (!authHeader || !authHeader.startsWith('AWS4-')) {
|
||||
request.log.error('missing auth header for v4 auth');
|
||||
request.logger.error('missing auth header for v4 auth');
|
||||
throw errors.InvalidRequest.customizeDescription('Must use Auth V4 for this request.');
|
||||
}
|
||||
|
||||
let action;
|
||||
let level;
|
||||
let requestedResources = [];
|
||||
|
||||
if (params.level) {
|
||||
// Can't destructure here
|
||||
// eslint-disable-next-line prefer-destructuring
|
||||
level = params.level;
|
||||
requestedResources = [params.resource];
|
||||
action = 'ListMetrics';
|
||||
} else {
|
||||
requestedResources = params.body[params.resource];
|
||||
level = params.resource;
|
||||
action = params.Action.value;
|
||||
}
|
||||
|
||||
if (requestedResources.length === 0) {
|
||||
throw errors.InvalidRequest.customizeDescription('You must specify at lest one resource');
|
||||
}
|
||||
|
||||
let passed;
|
||||
let authorizedResources;
|
||||
|
||||
try {
|
||||
[passed, authorizedResources] = await authenticateRequest(request, params);
|
||||
[passed, authorizedResources] = await authenticateRequest(request, action, level, requestedResources);
|
||||
} catch (error) {
|
||||
request.logger.error('error during authentication', { error });
|
||||
throw errors.InternalError;
|
||||
|
@ -104,8 +124,8 @@ async function authV4Middleware(request, response, params) {
|
|||
throw errors.AccessDenied;
|
||||
}
|
||||
|
||||
if (authorizedResources !== undefined) {
|
||||
params.body[params.resource.value] = authorizedResources;
|
||||
if (params.level === undefined && authorizedResources !== undefined) {
|
||||
params.body[params.resource] = authorizedResources;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ class IngestShardTask extends BaseTask {
|
|||
return new UtapiMetric(metric);
|
||||
});
|
||||
}
|
||||
const status = await this._warp10.ingest(metricClass, records);
|
||||
const status = await this._warp10.ingest({ className: metricClass }, records);
|
||||
assert.strictEqual(status, records.length);
|
||||
await this._cache.deleteShard(shard);
|
||||
} else {
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
/* eslint-disable no-restricted-syntax */
|
||||
const async = require('async');
|
||||
const { mpuBucketPrefix } = require('arsenal/lib/constants');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const { UtapiRecord } = require('../models');
|
||||
const config = require('../config');
|
||||
const metadata = require('../metadata');
|
||||
const { serviceToWarp10Label, warp10RecordType } = require('../constants');
|
||||
|
||||
const { LoggerContext, convertTimestamp } = require('../utils');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'ReindexTask',
|
||||
});
|
||||
|
||||
class ReindexTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super({
|
||||
warp10: {
|
||||
requestTimeout: 30000,
|
||||
connectTimeout: 30000,
|
||||
},
|
||||
...options,
|
||||
});
|
||||
this._defaultSchedule = config.reindexSchedule;
|
||||
this._defaultLag = 0;
|
||||
}
|
||||
|
||||
static async _indexBucket(bucket) {
|
||||
let size = 0;
|
||||
let count = 0;
|
||||
let lastMaster = null;
|
||||
let lastMasterSize = null;
|
||||
|
||||
for await (const obj of metadata.listObjects(bucket)) {
|
||||
if (obj.value.isDeleteMarker) {
|
||||
// eslint-disable-next-line no-continue
|
||||
continue;
|
||||
}
|
||||
count += 1;
|
||||
size += obj.value['content-length'];
|
||||
|
||||
// If versioned, subtract the size of the master to avoid double counting
|
||||
if (lastMaster && obj.name === lastMaster) {
|
||||
logger.debug('Detected versioned key, subtracting master size', { lastMasterSize, key: obj.name });
|
||||
size -= lastMasterSize;
|
||||
count -= 1;
|
||||
lastMaster = null;
|
||||
lastMasterSize = null;
|
||||
// Only save master versions
|
||||
} else if (!obj.version) {
|
||||
lastMaster = obj.name;
|
||||
lastMasterSize = obj.value['content-length'];
|
||||
}
|
||||
}
|
||||
|
||||
return { size, count };
|
||||
}
|
||||
|
||||
static async _indexMpuBucket(bucket) {
|
||||
if (await metadata.bucketExists(bucket)) {
|
||||
return ReindexTask._indexBucket(bucket);
|
||||
}
|
||||
|
||||
return { size: 0, count: 0 };
|
||||
}
|
||||
|
||||
async _fetchCurrentMetrics(level, resource) {
|
||||
const timestamp = convertTimestamp(new Date().getTime());
|
||||
const options = {
|
||||
params: {
|
||||
start: timestamp,
|
||||
end: timestamp,
|
||||
node: this._nodeId,
|
||||
labels: {
|
||||
[level]: resource,
|
||||
},
|
||||
},
|
||||
macro: 'utapi/getMetrics',
|
||||
};
|
||||
const res = await this._warp10.exec(options);
|
||||
return { timestamp, value: JSON.parse(res.result[0]) };
|
||||
}
|
||||
|
||||
async _updateMetric(level, resource, total) {
|
||||
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
|
||||
|
||||
const objectDelta = total.count - value.numberOfObjects[0];
|
||||
const sizeDelta = total.size - value.storageUtilized[0];
|
||||
|
||||
if (objectDelta !== 0 || sizeDelta !== 0) {
|
||||
logger.info('discrepancy detected in metrics, writing corrective record',
|
||||
{ [level]: resource, objectDelta, sizeDelta });
|
||||
|
||||
const record = new UtapiRecord({
|
||||
objectDelta,
|
||||
sizeDelta,
|
||||
timestamp,
|
||||
});
|
||||
|
||||
await this._warp10.ingest(
|
||||
{
|
||||
className: 'utapi.repair.reindex',
|
||||
labels: {
|
||||
[level]: resource,
|
||||
},
|
||||
valueType: warp10RecordType,
|
||||
},
|
||||
[record],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async _execute() {
|
||||
logger.debug('reindexing objects');
|
||||
|
||||
const accountTotals = {};
|
||||
const ignoredAccounts = new Set();
|
||||
|
||||
await async.eachLimit(metadata.listBuckets(), 5, async bucket => {
|
||||
logger.trace('starting reindex of bucket', { bucket: bucket.name });
|
||||
|
||||
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
|
||||
let bktTotal;
|
||||
let mpuTotal;
|
||||
|
||||
try {
|
||||
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
|
||||
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
|
||||
} catch (error) {
|
||||
logger.error('failed to reindex bucket, ignoring associated account', { error, bucket: bucket.name });
|
||||
ignoredAccounts.add(bucket.account);
|
||||
return;
|
||||
}
|
||||
|
||||
const total = {
|
||||
size: bktTotal.size + mpuTotal.size,
|
||||
count: bktTotal.count + mpuTotal.count,
|
||||
};
|
||||
|
||||
if (accountTotals[bucket.account]) {
|
||||
accountTotals[bucket.account].size += total.size;
|
||||
accountTotals[bucket.account].count += total.count;
|
||||
} else {
|
||||
accountTotals[bucket.account] = { ...total };
|
||||
}
|
||||
|
||||
logger.trace('finished indexing bucket', { bucket: bucket.name });
|
||||
|
||||
await Promise.all([
|
||||
this._updateMetric(
|
||||
serviceToWarp10Label.buckets,
|
||||
bucket.name,
|
||||
bktTotal,
|
||||
),
|
||||
this._updateMetric(
|
||||
serviceToWarp10Label.buckets,
|
||||
mpuBucket,
|
||||
mpuTotal,
|
||||
)]);
|
||||
});
|
||||
|
||||
const toUpdate = Object.entries(accountTotals)
|
||||
.filter(([account]) => !ignoredAccounts.has(account));
|
||||
|
||||
await async.eachLimit(toUpdate, 5, async ([account, total]) =>
|
||||
this._updateMetric(
|
||||
serviceToWarp10Label.accounts,
|
||||
account,
|
||||
total,
|
||||
));
|
||||
|
||||
logger.debug('finished reindexing');
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = ReindexTask;
|
|
@ -3,6 +3,7 @@ const IngestShard = require('./IngestShard');
|
|||
const CreateCheckpoint = require('./CreateCheckpoint');
|
||||
const CreateSnapshot = require('./CreateSnapshot');
|
||||
const RepairTask = require('./Repair');
|
||||
const ReindexTask = require('./Reindex');
|
||||
|
||||
module.exports = {
|
||||
IngestShard,
|
||||
|
@ -10,4 +11,5 @@ module.exports = {
|
|||
CreateCheckpoint,
|
||||
CreateSnapshot,
|
||||
RepairTask,
|
||||
ReindexTask,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
const { callbackify } = require('util');
|
||||
|
||||
/**
|
||||
* Convenience function to handle "if no callback then return a promise" pattern
|
||||
*
|
||||
* @param {Function} asyncFunc - asyncFunction to call
|
||||
* @param {Function|undefined} callback - optional callback
|
||||
* @returns {Promise|undefined} - returns a Promise if no callback is passed
|
||||
*/
|
||||
function asyncOrCallback(asyncFunc, callback) {
|
||||
if (typeof callback === 'function') {
|
||||
callbackify(asyncFunc)(callback);
|
||||
return undefined;
|
||||
}
|
||||
return asyncFunc();
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
asyncOrCallback,
|
||||
};
|
|
@ -1,9 +1,11 @@
|
|||
const log = require('./log');
|
||||
const shard = require('./shard');
|
||||
const timestamp = require('./timestamp');
|
||||
const func = require('./func');
|
||||
|
||||
module.exports = {
|
||||
...log,
|
||||
...shard,
|
||||
...timestamp,
|
||||
...func,
|
||||
};
|
||||
|
|
|
@ -11,51 +11,67 @@ werelogs.configure(loggerConfig);
|
|||
const rootLogger = new werelogs.Logger('Utapi');
|
||||
|
||||
class LoggerContext {
|
||||
constructor(defaults) {
|
||||
constructor(defaults, logger = null) {
|
||||
this._defaults = defaults;
|
||||
this.logger = logger !== null ? logger : rootLogger;
|
||||
}
|
||||
|
||||
get defaults() {
|
||||
return this._defaults || {};
|
||||
}
|
||||
|
||||
static expandError(data) {
|
||||
if (data && data.error) {
|
||||
return { ...data, errmsg: data.error.message, stack: data.error.stack };
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
_collectData(data) {
|
||||
return { ...this.defaults, ...LoggerContext.expandError(data) };
|
||||
}
|
||||
|
||||
with(extraDefaults) {
|
||||
return new LoggerContext({ ...this.defaults, ...extraDefaults });
|
||||
return new LoggerContext({ ...this.defaults, ...extraDefaults }, this.logger);
|
||||
}
|
||||
|
||||
withLogger(logger) {
|
||||
return new LoggerContext({ ...this.defaults }, logger);
|
||||
}
|
||||
|
||||
info(msg, data = {}) {
|
||||
return rootLogger.info(msg, { ...this.defaults, ...data });
|
||||
return this.logger.info(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
debug(msg, data = {}) {
|
||||
return rootLogger.debug(msg, { ...this.defaults, ...data });
|
||||
return this.logger.debug(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
trace(msg, data = {}) {
|
||||
return rootLogger.trace(msg, { ...this.defaults, ...data });
|
||||
return this.logger.trace(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
warn(msg, data = {}) {
|
||||
return rootLogger.warn(msg, { ...this.defaults, ...data });
|
||||
return this.logger.warn(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
error(msg, data = {}) {
|
||||
let _data = data;
|
||||
if (data && data.error) {
|
||||
_data = { ...data, errmsg: data.error.message, stack: data.error.stack };
|
||||
}
|
||||
return rootLogger.error(msg, { ...this.defaults, ..._data });
|
||||
return this.logger.error(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
fatal(msg, data = {}) {
|
||||
return rootLogger.fatal(msg, { ...this.defaults, ...data });
|
||||
return this.logger.fatal(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
end(msg, data = {}) {
|
||||
return this.logger.end(msg, this._collectData(data));
|
||||
}
|
||||
|
||||
async logAsyncError(func, msg, data = {}) {
|
||||
try {
|
||||
return await func();
|
||||
} catch (error) {
|
||||
this.error(msg, { error, ...data });
|
||||
this.error(msg, { ...data, error });
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
@ -83,7 +99,7 @@ function buildRequestLogger(req) {
|
|||
};
|
||||
|
||||
reqLogger.addDefaultFields(defaultInfo);
|
||||
return reqLogger;
|
||||
return new LoggerContext({}, reqLogger);
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
|
|
@ -32,7 +32,18 @@ class InterpolatedClock {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current time as
|
||||
* the number of microseconds since the epoch
|
||||
*
|
||||
* @returns {Number} - current timestamp
|
||||
*/
|
||||
function now() {
|
||||
return new Date().getTime() * 1000;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
convertTimestamp,
|
||||
InterpolatedClock,
|
||||
now,
|
||||
};
|
||||
|
|
|
@ -85,19 +85,19 @@ class Vault {
|
|||
const handler = new Vault(config);
|
||||
auth.setHandler(handler);
|
||||
|
||||
async function authenticateRequest(request, params) {
|
||||
async function authenticateRequest(request, action, level, resources) {
|
||||
const policyContext = new policies.RequestContext(
|
||||
request.headers,
|
||||
params.resource,
|
||||
params.body[params.resource],
|
||||
level,
|
||||
resources,
|
||||
request.ip,
|
||||
request.ctx.encrypted,
|
||||
params.Action.value,
|
||||
action,
|
||||
'utapi',
|
||||
);
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
auth.server.doAuth(request, request.logger, (err, res) => {
|
||||
auth.server.doAuth(request, request.logger.logger, (err, res) => {
|
||||
if (err && (err.InvalidAccessKeyId || err.AccessDenied)) {
|
||||
resolve([false]);
|
||||
return;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
const { Warp10 } = require('@senx/warp10');
|
||||
const { eventFieldsToWarp10, warp10ValueType } = require('./constants');
|
||||
const assert = require('assert');
|
||||
const { eventFieldsToWarp10, warp10EventType } = require('./constants');
|
||||
const _config = require('./config');
|
||||
const { LoggerContext } = require('./utils');
|
||||
const errors = require('./errors');
|
||||
|
@ -44,29 +45,39 @@ class Warp10Client {
|
|||
// eslint-disable-next-line no-await-in-loop
|
||||
return await func(client, ...params);
|
||||
} catch (error) {
|
||||
moduleLogger.warn('error during warp10 operation, failing over to next host', { error });
|
||||
moduleLogger.warn('error during warp10 operation, failing over to next host',
|
||||
{ statusCode: error.statusCode, statusMessage: error.statusMessage, error });
|
||||
}
|
||||
}
|
||||
moduleLogger.error('no remaining warp10 hosts to try, unable to complete request');
|
||||
throw errors.InternalError;
|
||||
}
|
||||
|
||||
static _packEvent(event) {
|
||||
static _packEvent(valueType, event) {
|
||||
const packed = Object.entries(event.getValue())
|
||||
.filter(([key]) => eventFieldsToWarp10[key])
|
||||
.map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`)
|
||||
.join(' ');
|
||||
return `${warp10ValueType}{ ${packed} }`;
|
||||
return `${valueType}{ ${packed} }`;
|
||||
}
|
||||
|
||||
_buildGTSEntry(className, event) {
|
||||
const labels = this._clients[0].formatLabels({ node: this._nodeId });
|
||||
const packed = Warp10Client._packEvent(event);
|
||||
return `${event.timestamp}// ${className}${labels} ${packed}`;
|
||||
_buildGTSEntry(className, valueType, labels, event) {
|
||||
const _labels = this._clients[0].formatLabels({ node: this._nodeId, ...labels });
|
||||
const packed = Warp10Client._packEvent(valueType, event);
|
||||
return `${event.timestamp}// ${className}${_labels} ${packed}`;
|
||||
}
|
||||
|
||||
async _ingest(warp10, className, events) {
|
||||
const payload = events.map(ev => this._buildGTSEntry(className, ev));
|
||||
async _ingest(warp10, metadata, events) {
|
||||
const { className, valueType, labels } = metadata;
|
||||
assert.notStrictEqual(className, undefined, 'you must provide a className');
|
||||
const payload = events.map(
|
||||
ev => this._buildGTSEntry(
|
||||
className,
|
||||
valueType || warp10EventType,
|
||||
labels || {},
|
||||
ev,
|
||||
),
|
||||
);
|
||||
const res = await warp10.update(this._writeToken, payload);
|
||||
return res.count;
|
||||
}
|
||||
|
|
39
openapi.yaml
39
openapi.yaml
|
@ -84,7 +84,31 @@ components:
|
|||
type: string
|
||||
message:
|
||||
type: string
|
||||
utapi-get-storage-v1:
|
||||
description: storageUtilized for a single resource
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
type: object
|
||||
required:
|
||||
- storageUtilized
|
||||
- resource
|
||||
- level
|
||||
properties:
|
||||
storageUtilized:
|
||||
type: integer
|
||||
resource:
|
||||
type: string
|
||||
level:
|
||||
type: string
|
||||
parameters:
|
||||
level:
|
||||
in: path
|
||||
name: level
|
||||
required: true
|
||||
schema:
|
||||
type: string
|
||||
enum: [ 'accounts' ]
|
||||
resource:
|
||||
in: path
|
||||
name: resource
|
||||
|
@ -125,7 +149,20 @@ paths:
|
|||
application/json:
|
||||
schema:
|
||||
$ref: '#/components/schemas/utapi-metric-v1'
|
||||
|
||||
/v2/storage/{level}/{resource}:
|
||||
parameters:
|
||||
- $ref: '#/components/parameters/resource'
|
||||
- $ref: '#/components/parameters/level'
|
||||
get:
|
||||
x-router-controller: metrics
|
||||
x-authv4: true
|
||||
description: Get current storage utilized
|
||||
operationId: getStorage
|
||||
responses:
|
||||
default:
|
||||
$ref: '#/components/responses/json-error'
|
||||
200:
|
||||
$ref: '#/components/responses/utapi-get-storage-v1'
|
||||
/{resource}:
|
||||
parameters:
|
||||
- $ref: '#/components/parameters/resource'
|
||||
|
|
|
@ -19,9 +19,10 @@
|
|||
"dependencies": {
|
||||
"@hapi/joi": "^17.1.1",
|
||||
"@senx/warp10": "^1.0.10",
|
||||
"arsenal": "scality/Arsenal#32c895b",
|
||||
"async": "^2.0.1",
|
||||
"arsenal": "scality/Arsenal#aa9c9e5",
|
||||
"async": "^3.2.0",
|
||||
"body-parser": "^1.19.0",
|
||||
"bucketclient": "scality/bucketclient",
|
||||
"commander": "^5.1.0",
|
||||
"cron-parser": "^2.15.0",
|
||||
"express": "^4.17.1",
|
||||
|
@ -62,6 +63,7 @@
|
|||
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
|
||||
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
|
||||
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
|
||||
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
|
||||
"start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js",
|
||||
"start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js"
|
||||
}
|
||||
|
|
|
@ -201,7 +201,7 @@ describe('UtapiReindex', () => {
|
|||
shouldLeave = res === value;
|
||||
return setTimeout(next, 200);
|
||||
}),
|
||||
() => shouldLeave, cb);
|
||||
next => next(null, shouldLeave), cb);
|
||||
}
|
||||
|
||||
function checkMetrics({ resource, expected }, cb) {
|
||||
|
|
|
@ -63,7 +63,7 @@ async function listMetrics(level, resources, start, end) {
|
|||
}
|
||||
|
||||
async function ingestEvents(events) {
|
||||
return events.length === await warp10.ingest('utapi.event', events);
|
||||
return events.length === await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
}
|
||||
|
||||
function opsToResp(operations) {
|
||||
|
|
|
@ -59,7 +59,7 @@ describe('Test CreateCheckpoint', function () {
|
|||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(0));
|
||||
|
||||
const results = await warp10.fetch({
|
||||
|
@ -88,8 +88,8 @@ describe('Test CreateCheckpoint', function () {
|
|||
accounts,
|
||||
);
|
||||
|
||||
await warp10.ingest('utapi.event', historicalEvents);
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, historicalEvents);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(-400));
|
||||
await checkpointTask._execute(getTs(0));
|
||||
|
||||
|
@ -109,7 +109,7 @@ describe('Test CreateCheckpoint', function () {
|
|||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
|
||||
);
|
||||
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(-100));
|
||||
|
||||
let results = await warp10.fetch({
|
||||
|
|
|
@ -67,7 +67,7 @@ describe('Test CreateSnapshot', function () {
|
|||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(-1));
|
||||
await snapshotTask._execute(getTs(0));
|
||||
|
||||
|
@ -86,7 +86,7 @@ describe('Test CreateSnapshot', function () {
|
|||
const { events, totals } = generateCustomEvents(start, stop, 500,
|
||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(-400));
|
||||
await checkpointTask._execute(getTs(-300));
|
||||
await checkpointTask._execute(getTs(-200));
|
||||
|
@ -110,7 +110,7 @@ describe('Test CreateSnapshot', function () {
|
|||
const { events, totals } = generateCustomEvents(start, stop, 500,
|
||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(-300));
|
||||
await snapshotTask._execute(getTs(-250));
|
||||
|
||||
|
@ -132,11 +132,11 @@ describe('Test CreateSnapshot', function () {
|
|||
const stop = getTs(-50);
|
||||
const { events, totals } = generateCustomEvents(start, stop, 500, accounts);
|
||||
|
||||
await warp10.ingest('utapi.event', events);
|
||||
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||
await checkpointTask._execute(getTs(-1));
|
||||
|
||||
const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts);
|
||||
await warp10.ingest('utapi.event', newEvents);
|
||||
await warp10.ingest({ className: 'utapi.event' }, newEvents);
|
||||
await checkpointTask._execute(getTs(100));
|
||||
|
||||
await snapshotTask._execute(getTs(0));
|
||||
|
@ -156,7 +156,7 @@ describe('Test CreateSnapshot', function () {
|
|||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||
|
||||
await warp10.ingest('utapi.repair.event', events);
|
||||
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||
await repairTask._execute(getTs(-1));
|
||||
await snapshotTask._execute(getTs(0));
|
||||
|
||||
|
|
|
@ -0,0 +1,89 @@
|
|||
const assert = require('assert');
|
||||
const uuid = require('uuid');
|
||||
const { mpuBucketPrefix } = require('arsenal/lib/constants');
|
||||
|
||||
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||
const { ReindexTask } = require('../../../../libV2/tasks');
|
||||
const { BucketD, values } = require('../../../utils/mock/');
|
||||
const { protobuf } = require('../../../utils/v2Data');
|
||||
|
||||
const { CANONICAL_ID, BUCKET_NAME } = values;
|
||||
const bucketCounts = [1, 1001];
|
||||
|
||||
const bucketRecord = {
|
||||
ops: {},
|
||||
sizeD: 1024,
|
||||
objD: 1,
|
||||
inB: 0,
|
||||
outB: 0,
|
||||
};
|
||||
|
||||
const accountRecord = {
|
||||
ops: {},
|
||||
sizeD: 2048,
|
||||
objD: 2,
|
||||
inB: 0,
|
||||
outB: 0,
|
||||
};
|
||||
|
||||
// eslint-disable-next-line func-names
|
||||
describe('Test ReindexTask', function () {
|
||||
this.timeout(1200000);
|
||||
|
||||
let prefix;
|
||||
let warp10;
|
||||
let reindexTask;
|
||||
const bucketd = new BucketD(true);
|
||||
|
||||
before(() => bucketd.start());
|
||||
|
||||
beforeEach(() => {
|
||||
prefix = uuid.v4();
|
||||
reindexTask = new ReindexTask({ warp10: { nodeId: prefix } });
|
||||
reindexTask._program = { nodeId: prefix };
|
||||
warp10 = new Warp10Client({ nodeId: prefix });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
bucketd.reset();
|
||||
});
|
||||
|
||||
async function assertResult(labels, value) {
|
||||
const results = await warp10.fetch({
|
||||
className: 'utapi.repair.reindex',
|
||||
labels,
|
||||
start: 'now',
|
||||
stop: -100,
|
||||
});
|
||||
|
||||
assert.strictEqual(results.result.length, 1);
|
||||
assert.notStrictEqual(results.result[0], '');
|
||||
const series = JSON.parse(results.result[0]);
|
||||
assert.strictEqual(series.length, 1);
|
||||
const record = protobuf.decode('Record', series[0].v[0][1]);
|
||||
assert.deepStrictEqual(record, value);
|
||||
}
|
||||
|
||||
bucketCounts.forEach(count => {
|
||||
it(`should reindex bucket listing with a length of ${count}`, async () => {
|
||||
const bucket = `${BUCKET_NAME}-${count}`;
|
||||
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
|
||||
bucketd
|
||||
.setBucketContent({
|
||||
bucketName: bucket,
|
||||
contentLength: 1024,
|
||||
})
|
||||
.setBucketContent({
|
||||
bucketName: mpuBucket,
|
||||
contentLength: 1024,
|
||||
})
|
||||
.setBucketCount(count)
|
||||
.createBuckets();
|
||||
|
||||
await reindexTask._execute();
|
||||
await assertResult({ bck: bucket, node: prefix }, bucketRecord);
|
||||
await assertResult({ bck: mpuBucket, node: prefix }, bucketRecord);
|
||||
await assertResult({ acc: CANONICAL_ID, node: prefix }, accountRecord);
|
||||
});
|
||||
});
|
||||
});
|
|
@ -60,7 +60,7 @@ describe('Test Repair', function () {
|
|||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||
|
||||
await warp10.ingest('utapi.repair.event', events);
|
||||
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||
await repairTask._execute(getTs(0));
|
||||
|
||||
const results = await warp10.fetch({
|
||||
|
@ -89,8 +89,8 @@ describe('Test Repair', function () {
|
|||
accounts,
|
||||
);
|
||||
|
||||
await warp10.ingest('utapi.repair.event', historicalEvents);
|
||||
await warp10.ingest('utapi.repair.event', events);
|
||||
await warp10.ingest({ className: 'utapi.repair.event' }, historicalEvents);
|
||||
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||
await repairTask._execute(getTs(-400));
|
||||
await repairTask._execute(getTs(0));
|
||||
|
||||
|
@ -110,7 +110,7 @@ describe('Test Repair', function () {
|
|||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
|
||||
);
|
||||
|
||||
await warp10.ingest('utapi.repair.event', events);
|
||||
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||
await repairTask._execute(getTs(-100));
|
||||
|
||||
let results = await warp10.fetch({
|
||||
|
|
|
@ -1,24 +1,49 @@
|
|||
const assert = require('assert');
|
||||
const async = require('async');
|
||||
const sinon = require('sinon');
|
||||
|
||||
const { generateFakeEvents } = require('../../utils/v2Data');
|
||||
const uuid = require('uuid');
|
||||
|
||||
const UtapiClient = require('../../../libV2/client');
|
||||
const config = require('../../../libV2/config');
|
||||
const { CacheClient, backends: cacheBackends } = require('../../../libV2/cache');
|
||||
const { IngestShard } = require('../../../libV2/tasks');
|
||||
const { generateCustomEvents } = require('../../utils/v2Data');
|
||||
|
||||
const events = generateFakeEvents(1, 50, 50);
|
||||
const getClient = prefix => new CacheClient({
|
||||
cacheBackend: new cacheBackends.RedisCache(
|
||||
config.cache,
|
||||
prefix,
|
||||
),
|
||||
counterBackend: new cacheBackends.RedisCache(
|
||||
config.cache,
|
||||
prefix,
|
||||
),
|
||||
});
|
||||
|
||||
const { events, totals } = generateCustomEvents(1, 50, 50, {
|
||||
[uuid.v4()]: { [uuid.v4()]: [uuid.v4()] },
|
||||
});
|
||||
|
||||
// Unskip after server side support is added
|
||||
// eslint-disable-next-line func-names
|
||||
describe.skip('Test UtapiClient', function () {
|
||||
describe('Test UtapiClient', function () {
|
||||
this.timeout(10000);
|
||||
let client;
|
||||
let sandbox;
|
||||
|
||||
let events;
|
||||
let total;
|
||||
|
||||
beforeEach(() => {
|
||||
sandbox = sinon.createSandbox();
|
||||
client = new UtapiClient({
|
||||
drainDelay: 5000,
|
||||
});
|
||||
const { events: _events, totals: _total } = generateCustomEvents(1, 50, 50, {
|
||||
[uuid.v4()]: { [uuid.v4()]: [uuid.v4()] },
|
||||
});
|
||||
|
||||
events = _events;
|
||||
totals = _totals;
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
|
@ -52,5 +77,35 @@ describe.skip('Test UtapiClient', function () {
|
|||
}, 6000);
|
||||
});
|
||||
});
|
||||
|
||||
describe.only('Test getStorage', () => {
|
||||
let prefix;
|
||||
let cacheClient;
|
||||
let ingestTask;
|
||||
|
||||
beforeEach(async () => {
|
||||
prefix = uuid.v4();
|
||||
cacheClient = getClient(prefix);
|
||||
await cacheClient.connect();
|
||||
|
||||
ingestTask = new IngestShard();
|
||||
ingestTask._cache._cacheBackend._prefix = prefix;
|
||||
console.log(ingestTask._cache._cacheBackend)
|
||||
ingestTask._program = { lag: 0 };
|
||||
await ingestTask._cache.connect();
|
||||
console.log(ingestTask._cache._cacheBackend)
|
||||
});
|
||||
|
||||
it('should get the current value from warp10 if cache is empty', async () => {
|
||||
await Promise.all(events.map(ev => cacheClient.pushMetric(ev)));
|
||||
await ingestTask.execute();
|
||||
|
||||
await async.eachOf(totals.accounts, async (total, acc) => {
|
||||
const resp = await client.getStorage('accounts', acc);
|
||||
console.log(total);
|
||||
console.log(resp);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
|
@ -19,13 +19,13 @@ describe('Test Warp Client', () => {
|
|||
});
|
||||
|
||||
it('should ingest records', async () => {
|
||||
const res = await warp10.ingest(className, testValues);
|
||||
const res = await warp10.ingest({ className }, testValues);
|
||||
assert.strictEqual(res, testValues.length);
|
||||
});
|
||||
|
||||
// TODO after the macro encoder is written this will need to be updated
|
||||
it('should fetch records', async () => {
|
||||
await warp10.ingest(className, testValues);
|
||||
await warp10.ingest({ className }, testValues);
|
||||
const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 });
|
||||
const parsed = JSON.parse(res.result[0])[0];
|
||||
assert.strictEqual(parsed.c, className);
|
||||
|
|
|
@ -7,14 +7,15 @@ const { models, constants } = require('arsenal');
|
|||
const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
|
||||
|
||||
const { ObjectMD } = models;
|
||||
const app = express();
|
||||
|
||||
class BucketD {
|
||||
constructor() {
|
||||
constructor(isV2 = false) {
|
||||
this._server = null;
|
||||
this._bucketCount = 0;
|
||||
this._bucketContent = {};
|
||||
this._buckets = [];
|
||||
this._isV2 = isV2;
|
||||
this._app = express();
|
||||
}
|
||||
|
||||
clearBuckets() {
|
||||
|
@ -61,14 +62,14 @@ class BucketD {
|
|||
CommonPrefixes: [],
|
||||
};
|
||||
const maxKeys = parseInt(req.query.maxKeys, 10);
|
||||
if (req.query.marker) {
|
||||
if (req.query.marker || req.query.gt) {
|
||||
body.IsTruncated = false;
|
||||
body.Contents = this._buckets.slice(maxKeys);
|
||||
} else {
|
||||
body.IsTruncated = maxKeys < this._bucketCount;
|
||||
body.Contents = this._buckets.slice(0, maxKeys);
|
||||
}
|
||||
return JSON.stringify(body);
|
||||
return body;
|
||||
}
|
||||
|
||||
_getShadowBucketResponse(bucketName) {
|
||||
|
@ -77,7 +78,7 @@ class BucketD {
|
|||
IsTruncated: false,
|
||||
Contents: this._bucketContent[bucketName] || [],
|
||||
};
|
||||
return JSON.stringify(body);
|
||||
return body;
|
||||
}
|
||||
|
||||
_getBucketResponse(bucketName) {
|
||||
|
@ -86,7 +87,7 @@ class BucketD {
|
|||
IsTruncated: false,
|
||||
Contents: this._bucketContent[bucketName] || [],
|
||||
};
|
||||
return JSON.stringify(body);
|
||||
return body;
|
||||
}
|
||||
|
||||
_getShadowBucketOverviewResponse(bucketName) {
|
||||
|
@ -99,11 +100,11 @@ class BucketD {
|
|||
IsTruncated: false,
|
||||
Uploads: mpus,
|
||||
};
|
||||
return JSON.stringify(body);
|
||||
return body;
|
||||
}
|
||||
|
||||
_initiateRoutes() {
|
||||
app.param('bucketName', (req, res, next, bucketName) => {
|
||||
this._app.param('bucketName', (req, res, next, bucketName) => {
|
||||
/* eslint-disable no-param-reassign */
|
||||
if (bucketName === constants.usersBucket) {
|
||||
req.body = this._getUsersBucketResponse(req);
|
||||
|
@ -115,26 +116,55 @@ class BucketD {
|
|||
) {
|
||||
req.body = this._getBucketResponse(bucketName);
|
||||
}
|
||||
|
||||
// v2 reindex uses `Basic` listing type for everything
|
||||
if (this._isV2) {
|
||||
if (req.body && req.body.Contents) {
|
||||
req.body = req.body.Contents;
|
||||
}
|
||||
}
|
||||
|
||||
/* eslint-enable no-param-reassign */
|
||||
next();
|
||||
});
|
||||
|
||||
app.get('/default/bucket/:bucketName', (req, res) => {
|
||||
res.writeHead(200);
|
||||
res.write(req.body);
|
||||
res.end();
|
||||
this._app.get('/default/attributes/:bucketName', (req, res) => {
|
||||
const key = req.params.bucketName;
|
||||
const bucket = this._bucketContent[key];
|
||||
if (bucket) {
|
||||
res.status(200).send({
|
||||
name: key,
|
||||
owner: CANONICAL_ID,
|
||||
ownerDisplayName: 'steve',
|
||||
creationDate: new Date(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
res.statusMessage = 'DBNotFound';
|
||||
res.status(404).end();
|
||||
});
|
||||
|
||||
|
||||
this._app.get('/default/bucket/:bucketName', (req, res) => {
|
||||
res.status(200).send(req.body);
|
||||
});
|
||||
}
|
||||
|
||||
start() {
|
||||
this._initiateRoutes();
|
||||
const port = 9000;
|
||||
this._server = http.createServer(app).listen(port);
|
||||
this._server = http.createServer(this._app).listen(port);
|
||||
}
|
||||
|
||||
end() {
|
||||
this._server.close();
|
||||
}
|
||||
|
||||
reset() {
|
||||
this._bucketCount = 0;
|
||||
this._bucketContent = {};
|
||||
this._buckets = [];
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = BucketD;
|
||||
|
|
|
@ -36,13 +36,18 @@
|
|||
$operation_info 'labels' GET 'labels' STORE
|
||||
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
$operation_info 'node' GET 'nodeID' STORE
|
||||
|
||||
// 'Fetching metrics for ' $labels ->JSON + LOGMSG
|
||||
// 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG
|
||||
|
||||
$read_token $labels $startTimestamp @utapi/getMetricsAt 'startResults' STORE
|
||||
{ 'labels' $labels 'node' $nodeID } 'opInfo' STORE
|
||||
|
||||
$read_token $labels $endTimestamp @utapi/getMetricsAt 'endResults' STORE
|
||||
$auth_info ->JSON $opInfo UNMAP 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'startResults' STORE
|
||||
$auth_info ->JSON $opInfo UNMAP 'end' $endTimestamp } ->JSON @utapi/getMetricsAt 'endResults' STORE
|
||||
|
||||
// $read_token $labels $startTimestamp $nodeID @utapi/getMetricsAt 'startResults' STORE
|
||||
// $read_token $labels $endTimestamp $nodeID @utapi/getMetricsAt 'endResults' STORE
|
||||
|
||||
// $startResults ->JSON LOGMSG
|
||||
// $endResults ->JSON LOGMSG
|
||||
|
|
|
@ -25,9 +25,21 @@
|
|||
<%
|
||||
'getMetricsAt' SECTION
|
||||
|
||||
'endTimestamp' STORE
|
||||
'labels' STORE
|
||||
'read_token' STORE
|
||||
JSON-> 'operation_info' STORE
|
||||
JSON-> 'auth_info' STORE
|
||||
|
||||
// $operation_info ->JSON LOGMSG
|
||||
|
||||
$auth_info 'read' GET 'read_token' STORE
|
||||
|
||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||
$operation_info 'labels' GET 'labels' STORE
|
||||
$operation_info 'node' GET 'nodeID' STORE
|
||||
|
||||
// 'nodeID' STORE
|
||||
// 'endTimestamp' STORE
|
||||
// 'labels' STORE
|
||||
// 'read_token' STORE
|
||||
|
||||
// Raise the max operations for a executing script
|
||||
// $read_token AUTHENTICATE
|
||||
|
@ -37,6 +49,7 @@
|
|||
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||
'utapi.snapshot' 'snapshot_class' STORE
|
||||
'utapi.repair.correction' 'correction_class' STORE
|
||||
'utapi.repair.reindex' 'reindex_class' STORE
|
||||
|
||||
{} 'snapshots' STORE
|
||||
|
||||
|
@ -248,6 +261,25 @@
|
|||
%> FOREACH
|
||||
%> FOREACH
|
||||
|
||||
'load_reindex' SECTION
|
||||
// Only load the latest reindex for the current node
|
||||
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
||||
{
|
||||
'token' $read_token
|
||||
'class' $reindex_class
|
||||
'labels' $filterLabels
|
||||
'end' $endTimestamp
|
||||
'count' 1
|
||||
} FETCH
|
||||
<% // Handle multiple GTS
|
||||
VALUES
|
||||
<% // For each reindex correction
|
||||
@utapi/decodeRecord
|
||||
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
|
||||
$results @util/sumRecord 'results' STORE
|
||||
%> FOREACH
|
||||
%> FOREACH
|
||||
|
||||
$results // Leave results on the stack
|
||||
|
||||
%>
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
<%
|
||||
EVAL @utapi/encodeRecord
|
||||
%>
|
Loading…
Reference in New Issue