Compare commits

..

12 Commits

Author SHA1 Message Date
Taylor McKinnon 84cb2113c2 cache deps 2020-09-16 13:05:34 -07:00
Taylor McKinnon 812d5a5897 stash 2020-09-16 12:43:34 -07:00
Taylor McKinnon 53ba468549 adjust authentication 2020-09-14 15:25:34 -07:00
Taylor McKinnon 9c58dcabea add route handler 2020-09-14 15:24:48 -07:00
Taylor McKinnon a5c52de18d Update warp 10 macros to allow calling getMetricsAt separately 2020-09-14 15:23:28 -07:00
Taylor McKinnon 659fb9918c Add route to openapi spec 2020-09-11 15:28:12 -07:00
Taylor McKinnon 12afb27d75 Merge branch 'feature/S3C-3286_add_reindex_task' into feature/S3C-3324_redis_backed_routes 2020-09-11 15:26:07 -07:00
Taylor McKinnon 309f380043 Add reindex task 2020-09-04 15:09:02 -07:00
Taylor McKinnon f3f12861c1 Update warp10 ingest method signature 2020-09-04 15:09:02 -07:00
Taylor McKinnon dd3067cf3e Add config and models 2020-09-04 15:09:02 -07:00
Taylor McKinnon 7695e7651b Fix utapiv1 bugs from async version bump 2020-09-04 15:09:02 -07:00
Taylor McKinnon 9a3cb48ad1 Add eve logging and update dependencies 2020-09-04 15:09:02 -07:00
46 changed files with 1632 additions and 129 deletions

View File

@ -3,5 +3,13 @@
"rules": { "rules": {
"no-underscore-dangle": "off", "no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off" "implicit-arrow-linebreak" : "off"
} },
"settings": {
"import/resolver": {
"node": {
"paths": ["/backbeat/node_modules", "node_modules"]
}
}
}
} }

14
bin/reindex.js Normal file
View File

@ -0,0 +1,14 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({
task: 'Reindex',
});
const task = new tasks.ReindexTask();
task.setup()
.then(() => logger.info('Starting Reindex daemon'))
.then(() => task.start())
.then(() => logger.info('Reindex started'));

View File

@ -16,14 +16,17 @@ models:
type: kube_pod type: kube_pod
path: eve/workers/pod.yml path: eve/workers/pod.yml
images: images:
aggressor: eve/workers/unit_and_feature_tests aggressor:
context: '.'
dockerfile: eve/workers/unit_and_feature_tests
warp10: warp10:
context: '.' context: '.'
dockerfile: 'images/warp10/Dockerfile' dockerfile: 'images/warp10/Dockerfile'
vault: eve/workers/mocks/vault vault: eve/workers/mocks/vault
- Install: &install - Install: &install
name: install node modules name: install node modules
command: yarn install --frozen-lockfile # command: yarn install --frozen-lockfile
command: echo '@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ Skipping Install @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@'
haltOnFailure: True haltOnFailure: True
- Upload: &upload_artifacts - Upload: &upload_artifacts
source: /artifacts source: /artifacts
@ -76,6 +79,10 @@ stages:
- ShellCommand: - ShellCommand:
name: run client tests name: run client tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:client.log"
follow: true
run-server-tests: run-server-tests:
worker: *workspace worker: *workspace
steps: steps:
@ -84,6 +91,10 @@ stages:
- ShellCommand: - ShellCommand:
name: run server tests name: run server tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:server.log"
follow: true
run-cron-tests: run-cron-tests:
worker: *workspace worker: *workspace
steps: steps:
@ -92,6 +103,10 @@ stages:
- ShellCommand: - ShellCommand:
name: run cron tests name: run cron tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:cron.log"
follow: true
run-interval-tests: run-interval-tests:
worker: *workspace worker: *workspace
steps: steps:
@ -100,6 +115,10 @@ stages:
- ShellCommand: - ShellCommand:
name: run interval tests name: run interval tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:interval.log"
follow: true
run-v2-functional-tests: run-v2-functional-tests:
worker: worker:
<< : *workspace << : *workspace
@ -123,6 +142,7 @@ stages:
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2 command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
env: env:
UTAPI_CACHE_BACKEND: redis UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
logfiles: logfiles:
warp10: warp10:
filename: "/artifacts/warp10.log" filename: "/artifacts/warp10.log"

View File

@ -3,10 +3,16 @@ FROM buildpack-deps:jessie-curl
# #
# Install apt packages needed by utapi and buildbot_worker # Install apt packages needed by utapi and buildbot_worker
# #
ENV LANG C.UTF-8 ENV LANG C.UTF-8
ENV NODE_VERSION 10.22.0 ENV NODE_VERSION 10.22.0
ENV PATH=$PATH:/utapi/node_modules/.bin
ENV NODE_PATH=/utapi/node_modules
COPY utapi_packages.list buildbot_worker_packages.list /tmp/ COPY utapi_packages.list buildbot_worker_packages.list /tmp/
WORKDIR /utapi
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \ RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \ && tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \ && curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
@ -19,7 +25,17 @@ RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x6
&& rm -f /etc/supervisor/conf.d/*.conf \ && rm -f /etc/supervisor/conf.d/*.conf \
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz && rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
COPY package.json yarn.lock /utapi/
#
# Install yarn dependencies
#
COPY package.json yarn.lock /utapi/
RUN yarn cache clean \
&& yarn install --frozen-lockfile \
&& yarn cache clean
# #
# Run buildbot-worker on startup through supervisor # Run buildbot-worker on startup through supervisor
# #

View File

@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
SETUP_CMD="start" SETUP_CMD="start"
fi fi
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD | tee -a "/artifacts/setup_$2.log" & UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "/artifacts/setup_$2.log" &
bash tests/utils/wait_for_local_port.bash $PORT 40 bash tests/utils/wait_for_local_port.bash $PORT 40
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log" UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"

View File

@ -490,7 +490,7 @@ class UtapiClient {
return done(); return done();
}), }),
// if cursor is 0, it reached end of scan // if cursor is 0, it reached end of scan
() => cursor === '0', cb => cb(null, cursor === '0'),
err => callback(err, keys), err => callback(err, keys),
); );
} }

View File

@ -49,6 +49,7 @@ class MemoryCache {
async addToShard(shard, event) { async addToShard(shard, event) {
const metricKey = schema.getUtapiMetricKey(this._prefix, event); const metricKey = schema.getUtapiMetricKey(this._prefix, event);
console.log(`MEM: ${metricKey}`)
this._data[metricKey] = event; this._data[metricKey] = event;
if (this._shards[shard]) { if (this._shards[shard]) {
this._shards[shard].push(metricKey); this._shards[shard].push(metricKey);

View File

@ -64,6 +64,7 @@ class RedisCache {
return logger return logger
.logAsyncError(async () => { .logAsyncError(async () => {
const metricKey = schema.getUtapiMetricKey(this._prefix, metric); const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
console.log(metricKey)
const shardKey = schema.getShardKey(this._prefix, shard); const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix); const shardMasterKey = schema.getShardMasterKey(this._prefix);
logger.debug('adding metric to shard', { metricKey, shardKey }); logger.debug('adding metric to shard', { metricKey, shardKey });

View File

@ -2,6 +2,7 @@ const { callbackify } = require('util');
const { Transform } = require('stream'); const { Transform } = require('stream');
const uuid = require('uuid'); const uuid = require('uuid');
const needle = require('needle'); const needle = require('needle');
const aws4 = require('aws4');
// These modules are added via the `level-mem` package rather than individually // These modules are added via the `level-mem` package rather than individually
/* eslint-disable import/no-extraneous-dependencies */ /* eslint-disable import/no-extraneous-dependencies */
@ -9,7 +10,7 @@ const levelup = require('levelup');
const memdown = require('memdown'); const memdown = require('memdown');
const encode = require('encoding-down'); const encode = require('encoding-down');
const { UtapiMetric } = require('../models'); const { UtapiMetric } = require('../models');
const { LoggerContext } = require('../utils'); const { LoggerContext, asyncOrCallback } = require('../utils');
/* eslint-enable import/no-extraneous-dependencies */ /* eslint-enable import/no-extraneous-dependencies */
const moduleLogger = new LoggerContext({ const moduleLogger = new LoggerContext({
@ -65,6 +66,7 @@ class Uploader extends Transform {
}); });
} }
} }
// const credentials = { accessKeyId, secretAccessKey, token };
class UtapiClient { class UtapiClient {
constructor(config) { constructor(config) {
@ -77,6 +79,10 @@ class UtapiClient {
this._drainTimer = null; this._drainTimer = null;
this._drainCanSchedule = true; this._drainCanSchedule = true;
this._drainDelay = (config && config.drainDelay) || 30000; this._drainDelay = (config && config.drainDelay) || 30000;
this._credentials = {
accessKeyId: (config && config.accessKeyId) || 'accessKey1',
secretAccessKey: (config && config.secretAccessKey) || 'verySecretKey1',
};
} }
async join() { async join() {
@ -259,6 +265,58 @@ class UtapiClient {
} }
return this._pushMetric(data); return this._pushMetric(data);
} }
async _signedRequest(method, url, options = {}) {
const _url = new URL(url);
const reqOptions = {
method,
hostname: _url.host,
service: 's3',
path: `${_url.pathname}${_url.search}`,
signQuery: false,
body: options.body,
};
const signedOptions = aws4.sign(reqOptions, this._credentials);
const args = [method, url];
if (options.body !== undefined) {
args.push(options.body);
}
args.push({
...options,
headers: signedOptions.headers,
});
return needle(...args);
}
/**
* Get the storageUtilized of a resource
*
* @param {string} level - level of metrics, curretly only 'accounts' is supported
* @param {string} resource - id of the resource
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - return a Promise if no callback is provided, undefined otherwise
*/
getStorage(level, resource, callback) {
if (level !== 'accounts') {
throw new Error('invalid level, only "accounts" is supported');
}
return asyncOrCallback(async () => {
const resp = await this._signedRequest(
'get',
`http://${this._host}:${this._port}/v2/storage/${level}/${resource}`,
);
if (resp.statusCode !== 200) {
throw new Error(`unable to retrieve metrics: ${resp.statusMessage}`);
}
return resp.body;
}, callback);
}
} }
module.exports = UtapiClient; module.exports = UtapiClient;

View File

@ -30,5 +30,10 @@
"ingestionSchedule": "*/5 * * * * *", "ingestionSchedule": "*/5 * * * * *",
"checkpointSchedule": "*/30 * * * * *", "checkpointSchedule": "*/30 * * * * *",
"snapshotSchedule": "* 0 * * * *", "snapshotSchedule": "* 0 * * * *",
"repairSchedule": "* */5 * * * *" "repairSchedule": "* */5 * * * *",
"bucketd": [ "localhost:9000" ],
"reindex": {
"enabled": true,
"schedule": "0 0 0 * * 6"
}
} }

View File

@ -285,6 +285,8 @@ class Config {
port: _loadFromEnv('VAULT_PORT', config.vaultd.port), port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
}; };
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
return parsedConfig; return parsedConfig;
} }

View File

@ -32,7 +32,6 @@ const warp10MultiHost = Joi.object({
writeToken: Joi.string(), writeToken: Joi.string(),
}); });
Joi.array().items(warp10SingleHost);
const tlsSchema = Joi.object({ const tlsSchema = Joi.object({
key: Joi.string(), key: Joi.string(),
@ -62,6 +61,11 @@ const schema = Joi.object({
host: Joi.string().hostname(), host: Joi.string().hostname(),
port: Joi.number().port(), port: Joi.number().port(),
}), }),
reindex: Joi.object({
enabled: Joi.boolean(),
schedule: Joi.string(),
}),
bucketd: Joi.array().items(Joi.string()),
expireMetrics: Joi.boolean(), expireMetrics: Joi.boolean(),
expireMetricsTTL: Joi.number(), expireMetricsTTL: Joi.number(),
cacheBackend: Joi.string().valid('memory', 'redis'), cacheBackend: Joi.string().valid('memory', 'redis'),

View File

@ -84,13 +84,15 @@ const constants = {
buckets: 'bck', buckets: 'bck',
}, },
warp10ValueType: ':m:utapi/event:', warp10EventType: ':m:utapi/event:',
warp10RecordType: ':m:utapi/record:',
truthy, truthy,
shardIngestLagSecs: 30, shardIngestLagSecs: 30,
checkpointLagSecs: 300, checkpointLagSecs: 300,
snapshotLagSecs: 900, snapshotLagSecs: 900,
repairLagSecs: 5, repairLagSecs: 5,
counterBaseValueExpiration: 86400, // 24hrs counterBaseValueExpiration: 86400, // 24hrs
keyVersionSplitter: String.fromCharCode(0),
}; };
constants.operationToResponse = constants.operations constants.operationToResponse = constants.operations

16
libV2/metadata/client.js Normal file
View File

@ -0,0 +1,16 @@
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
const bucketclient = require('bucketclient');
const config = require('../config');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const params = {
bucketdBootstrap: config.bucketd,
https: config.https,
};
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);

116
libV2/metadata/index.js Normal file
View File

@ -0,0 +1,116 @@
/* eslint-disable no-restricted-syntax */
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = require('arsenal/lib/constants');
const metadata = require('./client');
const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const PAGE_SIZE = 1000;
function _listingWrapper(bucket, params) {
return new Promise(
(resolve, reject) => metadata.listObject(
bucket,
params,
logger.newRequestLogger(),
(err, res) => {
if (err) {
reject(err);
return;
}
resolve(res);
},
),
);
}
function _listObject(bucket, prefix, hydrateFunc) {
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
let gt;
return {
async* [Symbol.asyncIterator]() {
while (true) {
let res;
try {
// eslint-disable-next-line no-await-in-loop
res = await _listingWrapper(bucket, { ...listingParams, gt });
} catch (error) {
moduleLogger.error('Error during listing', { error });
throw error;
}
for (const item of res) {
yield hydrateFunc ? hydrateFunc(item) : item;
}
if (res.length === 0 || res.length !== PAGE_SIZE) {
break;
}
gt = res[res.length - 1].key;
}
},
};
}
function listObjects(bucket) {
return _listObject(bucket, '', data => {
const { key, value } = data;
const [name, version] = key.split(keyVersionSplitter);
return {
name,
version,
value: JSON.parse(value),
};
});
}
function listBuckets() {
return _listObject(usersBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
async function listMPUs(bucket) {
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
return _listObject(mpuBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
function bucketExists(bucket) {
return new Promise((resolve, reject) => metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(),
err => {
if (err && !err.NoSuchBucket) {
reject(err);
return;
}
resolve(err === null);
},
));
}
module.exports = {
listBuckets,
listObjects,
listMPUs,
bucketExists,
};

View File

@ -34,7 +34,7 @@ class RequestContext extends RequestContextModel {
const tag = request.swagger.operation['x-router-controller']; const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation; const { operationId } = request.swagger.operation;
request.logger.addDefaultFields({ request.logger.logger.addDefaultFields({
tag, tag,
operationId, operationId,
service: 'utapi', service: 'utapi',

View File

@ -0,0 +1,12 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const recordSchema = {
timestamp: Joi.number(),
objectDelta: Joi.number(),
sizeDelta: Joi.number(),
incomingBytes: Joi.number(),
outgoingBytes: Joi.number(),
};
module.exports = buildModel('UtapiRecord', recordSchema);

View File

@ -1,5 +1,6 @@
const BaseModel = require('./Base'); const BaseModel = require('./Base');
const UtapiMetric = require('./UtapiMetric'); const UtapiMetric = require('./UtapiMetric');
const UtapiRecord = require('./UtapiRecord');
const RequestContext = require('./RequestContext'); const RequestContext = require('./RequestContext');
const ResponseContainer = require('./ResponseContainer'); const ResponseContainer = require('./ResponseContainer');
@ -8,4 +9,5 @@ module.exports = {
UtapiMetric, UtapiMetric,
RequestContext, RequestContext,
ResponseContainer, ResponseContainer,
UtapiRecord,
}; };

View File

@ -0,0 +1,49 @@
const errors = require('../../../errors');
const { serviceToWarp10Label } = require('../../../constants');
const { client: warp10 } = require('../../../warp10');
const { client: cache } = require('../../../cache');
const { now } = require('../../../utils');
const config = require('../../../config');
async function getStorage(ctx, params) {
const { level, resource } = params;
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
let storageUtilized;
if (base !== null) {
storageUtilized = counter + base;
} else {
const labelName = serviceToWarp10Label[params.level];
const labels = { [labelName]: resource };
const options = {
params: {
end: now(),
labels,
node: config.nodeId,
},
macro: 'utapi/getMetricsAt',
};
const res = await warp10.exec(options);
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { level, resource });
throw errors.InternalError;
}
const { sizeD: currentSize } = res.result[0];
await cache.updateAccountCounterBase(resource, currentSize);
storageUtilized = currentSize;
}
ctx.results.statusCode = 200;
ctx.results.body = {
storageUtilized,
resource,
level,
};
}
module.exports = getStorage;

View File

@ -2,6 +2,7 @@ const errors = require('../../../errors');
const { serviceToWarp10Label, operationToResponse } = require('../../../constants'); const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
const { convertTimestamp } = require('../../../utils'); const { convertTimestamp } = require('../../../utils');
const { client: warp10 } = require('../../../warp10'); const { client: warp10 } = require('../../../warp10');
const config = require('../../../config');
const emptyOperationsResponse = Object.values(operationToResponse) const emptyOperationsResponse = Object.values(operationToResponse)
.reduce((prev, key) => { .reduce((prev, key) => {
@ -32,6 +33,7 @@ async function listMetric(ctx, params) {
start, start,
end, end,
labels, labels,
node: config.nodeId,
}, },
macro: 'utapi/getMetrics', macro: 'utapi/getMetrics',
}; };

View File

@ -126,19 +126,20 @@ class APIController {
static async _callOperation(handler, request, response, params) { static async _callOperation(handler, request, response, params) {
try { try {
await handler(request.ctx, params); await handler(request.ctx, params);
} catch (err) { } catch (error) {
request.logger.error('error during operation', { err }); console.log(error)
throw err; request.logger.error('error during operation', { error });
throw error;
} }
request.logger.debug('writing operation result'); request.logger.debug('writing operation result');
try { try {
await APIController._writeResult(request.ctx.results, response); await APIController._writeResult(request.ctx.results, response);
} catch (err) { } catch (error) {
request.logger.error( request.logger.error(
'error while writing operation result', 'error while writing operation result',
{ err }, { error },
); );
throw err; throw error;
} }
} }

View File

@ -85,15 +85,35 @@ async function initializeOasTools(spec, app) {
async function authV4Middleware(request, response, params) { async function authV4Middleware(request, response, params) {
const authHeader = request.headers.authorization; const authHeader = request.headers.authorization;
if (!authHeader || !authHeader.startsWith('AWS4-')) { if (!authHeader || !authHeader.startsWith('AWS4-')) {
request.log.error('missing auth header for v4 auth'); request.logger.error('missing auth header for v4 auth');
throw errors.InvalidRequest.customizeDescription('Must use Auth V4 for this request.'); throw errors.InvalidRequest.customizeDescription('Must use Auth V4 for this request.');
} }
let action;
let level;
let requestedResources = [];
if (params.level) {
// Can't destructure here
// eslint-disable-next-line prefer-destructuring
level = params.level;
requestedResources = [params.resource];
action = 'ListMetrics';
} else {
requestedResources = params.body[params.resource];
level = params.resource;
action = params.Action.value;
}
if (requestedResources.length === 0) {
throw errors.InvalidRequest.customizeDescription('You must specify at lest one resource');
}
let passed; let passed;
let authorizedResources; let authorizedResources;
try { try {
[passed, authorizedResources] = await authenticateRequest(request, params); [passed, authorizedResources] = await authenticateRequest(request, action, level, requestedResources);
} catch (error) { } catch (error) {
request.logger.error('error during authentication', { error }); request.logger.error('error during authentication', { error });
throw errors.InternalError; throw errors.InternalError;
@ -104,8 +124,8 @@ async function authV4Middleware(request, response, params) {
throw errors.AccessDenied; throw errors.AccessDenied;
} }
if (authorizedResources !== undefined) { if (params.level === undefined && authorizedResources !== undefined) {
params.body[params.resource.value] = authorizedResources; params.body[params.resource] = authorizedResources;
} }
} }

View File

@ -59,7 +59,7 @@ class IngestShardTask extends BaseTask {
return new UtapiMetric(metric); return new UtapiMetric(metric);
}); });
} }
const status = await this._warp10.ingest(metricClass, records); const status = await this._warp10.ingest({ className: metricClass }, records);
assert.strictEqual(status, records.length); assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard); await this._cache.deleteShard(shard);
} else { } else {

177
libV2/tasks/Reindex.js Normal file
View File

@ -0,0 +1,177 @@
/* eslint-disable no-restricted-syntax */
const async = require('async');
const { mpuBucketPrefix } = require('arsenal/lib/constants');
const BaseTask = require('./BaseTask');
const { UtapiRecord } = require('../models');
const config = require('../config');
const metadata = require('../metadata');
const { serviceToWarp10Label, warp10RecordType } = require('../constants');
const { LoggerContext, convertTimestamp } = require('../utils');
const logger = new LoggerContext({
module: 'ReindexTask',
});
class ReindexTask extends BaseTask {
constructor(options) {
super({
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0;
}
static async _indexBucket(bucket) {
let size = 0;
let count = 0;
let lastMaster = null;
let lastMasterSize = null;
for await (const obj of metadata.listObjects(bucket)) {
if (obj.value.isDeleteMarker) {
// eslint-disable-next-line no-continue
continue;
}
count += 1;
size += obj.value['content-length'];
// If versioned, subtract the size of the master to avoid double counting
if (lastMaster && obj.name === lastMaster) {
logger.debug('Detected versioned key, subtracting master size', { lastMasterSize, key: obj.name });
size -= lastMasterSize;
count -= 1;
lastMaster = null;
lastMasterSize = null;
// Only save master versions
} else if (!obj.version) {
lastMaster = obj.name;
lastMasterSize = obj.value['content-length'];
}
}
return { size, count };
}
static async _indexMpuBucket(bucket) {
if (await metadata.bucketExists(bucket)) {
return ReindexTask._indexBucket(bucket);
}
return { size: 0, count: 0 };
}
async _fetchCurrentMetrics(level, resource) {
const timestamp = convertTimestamp(new Date().getTime());
const options = {
params: {
start: timestamp,
end: timestamp,
node: this._nodeId,
labels: {
[level]: resource,
},
},
macro: 'utapi/getMetrics',
};
const res = await this._warp10.exec(options);
return { timestamp, value: JSON.parse(res.result[0]) };
}
async _updateMetric(level, resource, total) {
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
const objectDelta = total.count - value.numberOfObjects[0];
const sizeDelta = total.size - value.storageUtilized[0];
if (objectDelta !== 0 || sizeDelta !== 0) {
logger.info('discrepancy detected in metrics, writing corrective record',
{ [level]: resource, objectDelta, sizeDelta });
const record = new UtapiRecord({
objectDelta,
sizeDelta,
timestamp,
});
await this._warp10.ingest(
{
className: 'utapi.repair.reindex',
labels: {
[level]: resource,
},
valueType: warp10RecordType,
},
[record],
);
}
}
async _execute() {
logger.debug('reindexing objects');
const accountTotals = {};
const ignoredAccounts = new Set();
await async.eachLimit(metadata.listBuckets(), 5, async bucket => {
logger.trace('starting reindex of bucket', { bucket: bucket.name });
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
let bktTotal;
let mpuTotal;
try {
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) {
logger.error('failed to reindex bucket, ignoring associated account', { error, bucket: bucket.name });
ignoredAccounts.add(bucket.account);
return;
}
const total = {
size: bktTotal.size + mpuTotal.size,
count: bktTotal.count + mpuTotal.count,
};
if (accountTotals[bucket.account]) {
accountTotals[bucket.account].size += total.size;
accountTotals[bucket.account].count += total.count;
} else {
accountTotals[bucket.account] = { ...total };
}
logger.trace('finished indexing bucket', { bucket: bucket.name });
await Promise.all([
this._updateMetric(
serviceToWarp10Label.buckets,
bucket.name,
bktTotal,
),
this._updateMetric(
serviceToWarp10Label.buckets,
mpuBucket,
mpuTotal,
)]);
});
const toUpdate = Object.entries(accountTotals)
.filter(([account]) => !ignoredAccounts.has(account));
await async.eachLimit(toUpdate, 5, async ([account, total]) =>
this._updateMetric(
serviceToWarp10Label.accounts,
account,
total,
));
logger.debug('finished reindexing');
}
}
module.exports = ReindexTask;

View File

@ -3,6 +3,7 @@ const IngestShard = require('./IngestShard');
const CreateCheckpoint = require('./CreateCheckpoint'); const CreateCheckpoint = require('./CreateCheckpoint');
const CreateSnapshot = require('./CreateSnapshot'); const CreateSnapshot = require('./CreateSnapshot');
const RepairTask = require('./Repair'); const RepairTask = require('./Repair');
const ReindexTask = require('./Reindex');
module.exports = { module.exports = {
IngestShard, IngestShard,
@ -10,4 +11,5 @@ module.exports = {
CreateCheckpoint, CreateCheckpoint,
CreateSnapshot, CreateSnapshot,
RepairTask, RepairTask,
ReindexTask,
}; };

20
libV2/utils/func.js Normal file
View File

@ -0,0 +1,20 @@
const { callbackify } = require('util');
/**
* Convenience function to handle "if no callback then return a promise" pattern
*
* @param {Function} asyncFunc - asyncFunction to call
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - returns a Promise if no callback is passed
*/
function asyncOrCallback(asyncFunc, callback) {
if (typeof callback === 'function') {
callbackify(asyncFunc)(callback);
return undefined;
}
return asyncFunc();
}
module.exports = {
asyncOrCallback,
};

View File

@ -1,9 +1,11 @@
const log = require('./log'); const log = require('./log');
const shard = require('./shard'); const shard = require('./shard');
const timestamp = require('./timestamp'); const timestamp = require('./timestamp');
const func = require('./func');
module.exports = { module.exports = {
...log, ...log,
...shard, ...shard,
...timestamp, ...timestamp,
...func,
}; };

View File

@ -11,51 +11,67 @@ werelogs.configure(loggerConfig);
const rootLogger = new werelogs.Logger('Utapi'); const rootLogger = new werelogs.Logger('Utapi');
class LoggerContext { class LoggerContext {
constructor(defaults) { constructor(defaults, logger = null) {
this._defaults = defaults; this._defaults = defaults;
this.logger = logger !== null ? logger : rootLogger;
} }
get defaults() { get defaults() {
return this._defaults || {}; return this._defaults || {};
} }
static expandError(data) {
if (data && data.error) {
return { ...data, errmsg: data.error.message, stack: data.error.stack };
}
return data;
}
_collectData(data) {
return { ...this.defaults, ...LoggerContext.expandError(data) };
}
with(extraDefaults) { with(extraDefaults) {
return new LoggerContext({ ...this.defaults, ...extraDefaults }); return new LoggerContext({ ...this.defaults, ...extraDefaults }, this.logger);
}
withLogger(logger) {
return new LoggerContext({ ...this.defaults }, logger);
} }
info(msg, data = {}) { info(msg, data = {}) {
return rootLogger.info(msg, { ...this.defaults, ...data }); return this.logger.info(msg, this._collectData(data));
} }
debug(msg, data = {}) { debug(msg, data = {}) {
return rootLogger.debug(msg, { ...this.defaults, ...data }); return this.logger.debug(msg, this._collectData(data));
} }
trace(msg, data = {}) { trace(msg, data = {}) {
return rootLogger.trace(msg, { ...this.defaults, ...data }); return this.logger.trace(msg, this._collectData(data));
} }
warn(msg, data = {}) { warn(msg, data = {}) {
return rootLogger.warn(msg, { ...this.defaults, ...data }); return this.logger.warn(msg, this._collectData(data));
} }
error(msg, data = {}) { error(msg, data = {}) {
let _data = data; return this.logger.error(msg, this._collectData(data));
if (data && data.error) {
_data = { ...data, errmsg: data.error.message, stack: data.error.stack };
}
return rootLogger.error(msg, { ...this.defaults, ..._data });
} }
fatal(msg, data = {}) { fatal(msg, data = {}) {
return rootLogger.fatal(msg, { ...this.defaults, ...data }); return this.logger.fatal(msg, this._collectData(data));
}
end(msg, data = {}) {
return this.logger.end(msg, this._collectData(data));
} }
async logAsyncError(func, msg, data = {}) { async logAsyncError(func, msg, data = {}) {
try { try {
return await func(); return await func();
} catch (error) { } catch (error) {
this.error(msg, { error, ...data }); this.error(msg, { ...data, error });
throw error; throw error;
} }
} }
@ -83,7 +99,7 @@ function buildRequestLogger(req) {
}; };
reqLogger.addDefaultFields(defaultInfo); reqLogger.addDefaultFields(defaultInfo);
return reqLogger; return new LoggerContext({}, reqLogger);
} }
module.exports = { module.exports = {

View File

@ -32,7 +32,18 @@ class InterpolatedClock {
} }
} }
/**
* Returns the current time as
* the number of microseconds since the epoch
*
* @returns {Number} - current timestamp
*/
function now() {
return new Date().getTime() * 1000;
}
module.exports = { module.exports = {
convertTimestamp, convertTimestamp,
InterpolatedClock, InterpolatedClock,
now,
}; };

View File

@ -85,19 +85,19 @@ class Vault {
const handler = new Vault(config); const handler = new Vault(config);
auth.setHandler(handler); auth.setHandler(handler);
async function authenticateRequest(request, params) { async function authenticateRequest(request, action, level, resources) {
const policyContext = new policies.RequestContext( const policyContext = new policies.RequestContext(
request.headers, request.headers,
params.resource, level,
params.body[params.resource], resources,
request.ip, request.ip,
request.ctx.encrypted, request.ctx.encrypted,
params.Action.value, action,
'utapi', 'utapi',
); );
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
auth.server.doAuth(request, request.logger, (err, res) => { auth.server.doAuth(request, request.logger.logger, (err, res) => {
if (err && (err.InvalidAccessKeyId || err.AccessDenied)) { if (err && (err.InvalidAccessKeyId || err.AccessDenied)) {
resolve([false]); resolve([false]);
return; return;

View File

@ -1,5 +1,6 @@
const { Warp10 } = require('@senx/warp10'); const { Warp10 } = require('@senx/warp10');
const { eventFieldsToWarp10, warp10ValueType } = require('./constants'); const assert = require('assert');
const { eventFieldsToWarp10, warp10EventType } = require('./constants');
const _config = require('./config'); const _config = require('./config');
const { LoggerContext } = require('./utils'); const { LoggerContext } = require('./utils');
const errors = require('./errors'); const errors = require('./errors');
@ -44,29 +45,39 @@ class Warp10Client {
// eslint-disable-next-line no-await-in-loop // eslint-disable-next-line no-await-in-loop
return await func(client, ...params); return await func(client, ...params);
} catch (error) { } catch (error) {
moduleLogger.warn('error during warp10 operation, failing over to next host', { error }); moduleLogger.warn('error during warp10 operation, failing over to next host',
{ statusCode: error.statusCode, statusMessage: error.statusMessage, error });
} }
} }
moduleLogger.error('no remaining warp10 hosts to try, unable to complete request'); moduleLogger.error('no remaining warp10 hosts to try, unable to complete request');
throw errors.InternalError; throw errors.InternalError;
} }
static _packEvent(event) { static _packEvent(valueType, event) {
const packed = Object.entries(event.getValue()) const packed = Object.entries(event.getValue())
.filter(([key]) => eventFieldsToWarp10[key]) .filter(([key]) => eventFieldsToWarp10[key])
.map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`) .map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`)
.join(' '); .join(' ');
return `${warp10ValueType}{ ${packed} }`; return `${valueType}{ ${packed} }`;
} }
_buildGTSEntry(className, event) { _buildGTSEntry(className, valueType, labels, event) {
const labels = this._clients[0].formatLabels({ node: this._nodeId }); const _labels = this._clients[0].formatLabels({ node: this._nodeId, ...labels });
const packed = Warp10Client._packEvent(event); const packed = Warp10Client._packEvent(valueType, event);
return `${event.timestamp}// ${className}${labels} ${packed}`; return `${event.timestamp}// ${className}${_labels} ${packed}`;
} }
async _ingest(warp10, className, events) { async _ingest(warp10, metadata, events) {
const payload = events.map(ev => this._buildGTSEntry(className, ev)); const { className, valueType, labels } = metadata;
assert.notStrictEqual(className, undefined, 'you must provide a className');
const payload = events.map(
ev => this._buildGTSEntry(
className,
valueType || warp10EventType,
labels || {},
ev,
),
);
const res = await warp10.update(this._writeToken, payload); const res = await warp10.update(this._writeToken, payload);
return res.count; return res.count;
} }

View File

@ -84,7 +84,31 @@ components:
type: string type: string
message: message:
type: string type: string
utapi-get-storage-v1:
description: storageUtilized for a single resource
content:
application/json:
schema:
type: object
required:
- storageUtilized
- resource
- level
properties:
storageUtilized:
type: integer
resource:
type: string
level:
type: string
parameters: parameters:
level:
in: path
name: level
required: true
schema:
type: string
enum: [ 'accounts' ]
resource: resource:
in: path in: path
name: resource name: resource
@ -125,7 +149,20 @@ paths:
application/json: application/json:
schema: schema:
$ref: '#/components/schemas/utapi-metric-v1' $ref: '#/components/schemas/utapi-metric-v1'
/v2/storage/{level}/{resource}:
parameters:
- $ref: '#/components/parameters/resource'
- $ref: '#/components/parameters/level'
get:
x-router-controller: metrics
x-authv4: true
description: Get current storage utilized
operationId: getStorage
responses:
default:
$ref: '#/components/responses/json-error'
200:
$ref: '#/components/responses/utapi-get-storage-v1'
/{resource}: /{resource}:
parameters: parameters:
- $ref: '#/components/parameters/resource' - $ref: '#/components/parameters/resource'

View File

@ -19,9 +19,10 @@
"dependencies": { "dependencies": {
"@hapi/joi": "^17.1.1", "@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.10", "@senx/warp10": "^1.0.10",
"arsenal": "scality/Arsenal#32c895b", "arsenal": "scality/Arsenal#aa9c9e5",
"async": "^2.0.1", "async": "^3.2.0",
"body-parser": "^1.19.0", "body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient",
"commander": "^5.1.0", "commander": "^5.1.0",
"cron-parser": "^2.15.0", "cron-parser": "^2.15.0",
"express": "^4.17.1", "express": "^4.17.1",
@ -62,6 +63,7 @@
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js", "start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js", "start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js", "start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
"start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js", "start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js",
"start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js" "start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js"
} }

View File

@ -201,7 +201,7 @@ describe('UtapiReindex', () => {
shouldLeave = res === value; shouldLeave = res === value;
return setTimeout(next, 200); return setTimeout(next, 200);
}), }),
() => shouldLeave, cb); next => next(null, shouldLeave), cb);
} }
function checkMetrics({ resource, expected }, cb) { function checkMetrics({ resource, expected }, cb) {

View File

@ -63,7 +63,7 @@ async function listMetrics(level, resources, start, end) {
} }
async function ingestEvents(events) { async function ingestEvents(events) {
return events.length === await warp10.ingest('utapi.event', events); return events.length === await warp10.ingest({ className: 'utapi.event' }, events);
} }
function opsToResp(operations) { function opsToResp(operations) {

View File

@ -59,7 +59,7 @@ describe('Test CreateCheckpoint', function () {
const { events, totals } = generateCustomEvents(start, stop, 100, const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }); { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(0)); await checkpointTask._execute(getTs(0));
const results = await warp10.fetch({ const results = await warp10.fetch({
@ -88,8 +88,8 @@ describe('Test CreateCheckpoint', function () {
accounts, accounts,
); );
await warp10.ingest('utapi.event', historicalEvents); await warp10.ingest({ className: 'utapi.event' }, historicalEvents);
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-400)); await checkpointTask._execute(getTs(-400));
await checkpointTask._execute(getTs(0)); await checkpointTask._execute(getTs(0));
@ -109,7 +109,7 @@ describe('Test CreateCheckpoint', function () {
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }, { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
); );
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-100)); await checkpointTask._execute(getTs(-100));
let results = await warp10.fetch({ let results = await warp10.fetch({

View File

@ -67,7 +67,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 100, const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }); { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-1)); await checkpointTask._execute(getTs(-1));
await snapshotTask._execute(getTs(0)); await snapshotTask._execute(getTs(0));
@ -86,7 +86,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 500, const { events, totals } = generateCustomEvents(start, stop, 500,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }); { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-400)); await checkpointTask._execute(getTs(-400));
await checkpointTask._execute(getTs(-300)); await checkpointTask._execute(getTs(-300));
await checkpointTask._execute(getTs(-200)); await checkpointTask._execute(getTs(-200));
@ -110,7 +110,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 500, const { events, totals } = generateCustomEvents(start, stop, 500,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }); { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-300)); await checkpointTask._execute(getTs(-300));
await snapshotTask._execute(getTs(-250)); await snapshotTask._execute(getTs(-250));
@ -132,11 +132,11 @@ describe('Test CreateSnapshot', function () {
const stop = getTs(-50); const stop = getTs(-50);
const { events, totals } = generateCustomEvents(start, stop, 500, accounts); const { events, totals } = generateCustomEvents(start, stop, 500, accounts);
await warp10.ingest('utapi.event', events); await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-1)); await checkpointTask._execute(getTs(-1));
const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts); const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts);
await warp10.ingest('utapi.event', newEvents); await warp10.ingest({ className: 'utapi.event' }, newEvents);
await checkpointTask._execute(getTs(100)); await checkpointTask._execute(getTs(100));
await snapshotTask._execute(getTs(0)); await snapshotTask._execute(getTs(0));
@ -156,7 +156,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 100, const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }); { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.repair.event', events); await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(-1)); await repairTask._execute(getTs(-1));
await snapshotTask._execute(getTs(0)); await snapshotTask._execute(getTs(0));

View File

@ -0,0 +1,89 @@
const assert = require('assert');
const uuid = require('uuid');
const { mpuBucketPrefix } = require('arsenal/lib/constants');
const { Warp10Client } = require('../../../../libV2/warp10');
const { ReindexTask } = require('../../../../libV2/tasks');
const { BucketD, values } = require('../../../utils/mock/');
const { protobuf } = require('../../../utils/v2Data');
const { CANONICAL_ID, BUCKET_NAME } = values;
const bucketCounts = [1, 1001];
const bucketRecord = {
ops: {},
sizeD: 1024,
objD: 1,
inB: 0,
outB: 0,
};
const accountRecord = {
ops: {},
sizeD: 2048,
objD: 2,
inB: 0,
outB: 0,
};
// eslint-disable-next-line func-names
describe('Test ReindexTask', function () {
this.timeout(1200000);
let prefix;
let warp10;
let reindexTask;
const bucketd = new BucketD(true);
before(() => bucketd.start());
beforeEach(() => {
prefix = uuid.v4();
reindexTask = new ReindexTask({ warp10: { nodeId: prefix } });
reindexTask._program = { nodeId: prefix };
warp10 = new Warp10Client({ nodeId: prefix });
});
afterEach(() => {
bucketd.reset();
});
async function assertResult(labels, value) {
const results = await warp10.fetch({
className: 'utapi.repair.reindex',
labels,
start: 'now',
stop: -100,
});
assert.strictEqual(results.result.length, 1);
assert.notStrictEqual(results.result[0], '');
const series = JSON.parse(results.result[0]);
assert.strictEqual(series.length, 1);
const record = protobuf.decode('Record', series[0].v[0][1]);
assert.deepStrictEqual(record, value);
}
bucketCounts.forEach(count => {
it(`should reindex bucket listing with a length of ${count}`, async () => {
const bucket = `${BUCKET_NAME}-${count}`;
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
bucketd
.setBucketContent({
bucketName: bucket,
contentLength: 1024,
})
.setBucketContent({
bucketName: mpuBucket,
contentLength: 1024,
})
.setBucketCount(count)
.createBuckets();
await reindexTask._execute();
await assertResult({ bck: bucket, node: prefix }, bucketRecord);
await assertResult({ bck: mpuBucket, node: prefix }, bucketRecord);
await assertResult({ acc: CANONICAL_ID, node: prefix }, accountRecord);
});
});
});

View File

@ -60,7 +60,7 @@ describe('Test Repair', function () {
const { events, totals } = generateCustomEvents(start, stop, 100, const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }); { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.repair.event', events); await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(0)); await repairTask._execute(getTs(0));
const results = await warp10.fetch({ const results = await warp10.fetch({
@ -89,8 +89,8 @@ describe('Test Repair', function () {
accounts, accounts,
); );
await warp10.ingest('utapi.repair.event', historicalEvents); await warp10.ingest({ className: 'utapi.repair.event' }, historicalEvents);
await warp10.ingest('utapi.repair.event', events); await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(-400)); await repairTask._execute(getTs(-400));
await repairTask._execute(getTs(0)); await repairTask._execute(getTs(0));
@ -110,7 +110,7 @@ describe('Test Repair', function () {
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } }, { [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
); );
await warp10.ingest('utapi.repair.event', events); await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(-100)); await repairTask._execute(getTs(-100));
let results = await warp10.fetch({ let results = await warp10.fetch({

View File

@ -1,24 +1,49 @@
const assert = require('assert'); const assert = require('assert');
const async = require('async');
const sinon = require('sinon'); const sinon = require('sinon');
const uuid = require('uuid');
const { generateFakeEvents } = require('../../utils/v2Data');
const UtapiClient = require('../../../libV2/client'); const UtapiClient = require('../../../libV2/client');
const config = require('../../../libV2/config');
const { CacheClient, backends: cacheBackends } = require('../../../libV2/cache');
const { IngestShard } = require('../../../libV2/tasks');
const { generateCustomEvents } = require('../../utils/v2Data');
const events = generateFakeEvents(1, 50, 50); const getClient = prefix => new CacheClient({
cacheBackend: new cacheBackends.RedisCache(
config.cache,
prefix,
),
counterBackend: new cacheBackends.RedisCache(
config.cache,
prefix,
),
});
const { events, totals } = generateCustomEvents(1, 50, 50, {
[uuid.v4()]: { [uuid.v4()]: [uuid.v4()] },
});
// Unskip after server side support is added
// eslint-disable-next-line func-names // eslint-disable-next-line func-names
describe.skip('Test UtapiClient', function () { describe('Test UtapiClient', function () {
this.timeout(10000); this.timeout(10000);
let client; let client;
let sandbox; let sandbox;
let events;
let total;
beforeEach(() => { beforeEach(() => {
sandbox = sinon.createSandbox(); sandbox = sinon.createSandbox();
client = new UtapiClient({ client = new UtapiClient({
drainDelay: 5000, drainDelay: 5000,
}); });
const { events: _events, totals: _total } = generateCustomEvents(1, 50, 50, {
[uuid.v4()]: { [uuid.v4()]: [uuid.v4()] },
});
events = _events;
totals = _totals;
}); });
afterEach(() => { afterEach(() => {
@ -52,5 +77,35 @@ describe.skip('Test UtapiClient', function () {
}, 6000); }, 6000);
}); });
}); });
describe.only('Test getStorage', () => {
let prefix;
let cacheClient;
let ingestTask;
beforeEach(async () => {
prefix = uuid.v4();
cacheClient = getClient(prefix);
await cacheClient.connect();
ingestTask = new IngestShard();
ingestTask._cache._cacheBackend._prefix = prefix;
console.log(ingestTask._cache._cacheBackend)
ingestTask._program = { lag: 0 };
await ingestTask._cache.connect();
console.log(ingestTask._cache._cacheBackend)
});
it('should get the current value from warp10 if cache is empty', async () => {
await Promise.all(events.map(ev => cacheClient.pushMetric(ev)));
await ingestTask.execute();
await async.eachOf(totals.accounts, async (total, acc) => {
const resp = await client.getStorage('accounts', acc);
console.log(total);
console.log(resp);
});
});
});
}); });

View File

@ -19,13 +19,13 @@ describe('Test Warp Client', () => {
}); });
it('should ingest records', async () => { it('should ingest records', async () => {
const res = await warp10.ingest(className, testValues); const res = await warp10.ingest({ className }, testValues);
assert.strictEqual(res, testValues.length); assert.strictEqual(res, testValues.length);
}); });
// TODO after the macro encoder is written this will need to be updated // TODO after the macro encoder is written this will need to be updated
it('should fetch records', async () => { it('should fetch records', async () => {
await warp10.ingest(className, testValues); await warp10.ingest({ className }, testValues);
const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 }); const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 });
const parsed = JSON.parse(res.result[0])[0]; const parsed = JSON.parse(res.result[0])[0];
assert.strictEqual(parsed.c, className); assert.strictEqual(parsed.c, className);

View File

@ -7,14 +7,15 @@ const { models, constants } = require('arsenal');
const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values'); const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
const { ObjectMD } = models; const { ObjectMD } = models;
const app = express();
class BucketD { class BucketD {
constructor() { constructor(isV2 = false) {
this._server = null; this._server = null;
this._bucketCount = 0; this._bucketCount = 0;
this._bucketContent = {}; this._bucketContent = {};
this._buckets = []; this._buckets = [];
this._isV2 = isV2;
this._app = express();
} }
clearBuckets() { clearBuckets() {
@ -61,14 +62,14 @@ class BucketD {
CommonPrefixes: [], CommonPrefixes: [],
}; };
const maxKeys = parseInt(req.query.maxKeys, 10); const maxKeys = parseInt(req.query.maxKeys, 10);
if (req.query.marker) { if (req.query.marker || req.query.gt) {
body.IsTruncated = false; body.IsTruncated = false;
body.Contents = this._buckets.slice(maxKeys); body.Contents = this._buckets.slice(maxKeys);
} else { } else {
body.IsTruncated = maxKeys < this._bucketCount; body.IsTruncated = maxKeys < this._bucketCount;
body.Contents = this._buckets.slice(0, maxKeys); body.Contents = this._buckets.slice(0, maxKeys);
} }
return JSON.stringify(body); return body;
} }
_getShadowBucketResponse(bucketName) { _getShadowBucketResponse(bucketName) {
@ -77,7 +78,7 @@ class BucketD {
IsTruncated: false, IsTruncated: false,
Contents: this._bucketContent[bucketName] || [], Contents: this._bucketContent[bucketName] || [],
}; };
return JSON.stringify(body); return body;
} }
_getBucketResponse(bucketName) { _getBucketResponse(bucketName) {
@ -86,7 +87,7 @@ class BucketD {
IsTruncated: false, IsTruncated: false,
Contents: this._bucketContent[bucketName] || [], Contents: this._bucketContent[bucketName] || [],
}; };
return JSON.stringify(body); return body;
} }
_getShadowBucketOverviewResponse(bucketName) { _getShadowBucketOverviewResponse(bucketName) {
@ -99,11 +100,11 @@ class BucketD {
IsTruncated: false, IsTruncated: false,
Uploads: mpus, Uploads: mpus,
}; };
return JSON.stringify(body); return body;
} }
_initiateRoutes() { _initiateRoutes() {
app.param('bucketName', (req, res, next, bucketName) => { this._app.param('bucketName', (req, res, next, bucketName) => {
/* eslint-disable no-param-reassign */ /* eslint-disable no-param-reassign */
if (bucketName === constants.usersBucket) { if (bucketName === constants.usersBucket) {
req.body = this._getUsersBucketResponse(req); req.body = this._getUsersBucketResponse(req);
@ -115,26 +116,55 @@ class BucketD {
) { ) {
req.body = this._getBucketResponse(bucketName); req.body = this._getBucketResponse(bucketName);
} }
// v2 reindex uses `Basic` listing type for everything
if (this._isV2) {
if (req.body && req.body.Contents) {
req.body = req.body.Contents;
}
}
/* eslint-enable no-param-reassign */ /* eslint-enable no-param-reassign */
next(); next();
}); });
app.get('/default/bucket/:bucketName', (req, res) => { this._app.get('/default/attributes/:bucketName', (req, res) => {
res.writeHead(200); const key = req.params.bucketName;
res.write(req.body); const bucket = this._bucketContent[key];
res.end(); if (bucket) {
res.status(200).send({
name: key,
owner: CANONICAL_ID,
ownerDisplayName: 'steve',
creationDate: new Date(),
});
return;
}
res.statusMessage = 'DBNotFound';
res.status(404).end();
});
this._app.get('/default/bucket/:bucketName', (req, res) => {
res.status(200).send(req.body);
}); });
} }
start() { start() {
this._initiateRoutes(); this._initiateRoutes();
const port = 9000; const port = 9000;
this._server = http.createServer(app).listen(port); this._server = http.createServer(this._app).listen(port);
} }
end() { end() {
this._server.close(); this._server.close();
} }
reset() {
this._bucketCount = 0;
this._bucketContent = {};
this._buckets = [];
}
} }
module.exports = BucketD; module.exports = BucketD;

View File

@ -36,13 +36,18 @@
$operation_info 'labels' GET 'labels' STORE $operation_info 'labels' GET 'labels' STORE
$operation_info 'start' GET TOLONG 'startTimestamp' STORE $operation_info 'start' GET TOLONG 'startTimestamp' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE $operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'node' GET 'nodeID' STORE
// 'Fetching metrics for ' $labels ->JSON + LOGMSG // 'Fetching metrics for ' $labels ->JSON + LOGMSG
// 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG // 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG
$read_token $labels $startTimestamp @utapi/getMetricsAt 'startResults' STORE { 'labels' $labels 'node' $nodeID } 'opInfo' STORE
$read_token $labels $endTimestamp @utapi/getMetricsAt 'endResults' STORE $auth_info ->JSON $opInfo UNMAP 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'startResults' STORE
$auth_info ->JSON $opInfo UNMAP 'end' $endTimestamp } ->JSON @utapi/getMetricsAt 'endResults' STORE
// $read_token $labels $startTimestamp $nodeID @utapi/getMetricsAt 'startResults' STORE
// $read_token $labels $endTimestamp $nodeID @utapi/getMetricsAt 'endResults' STORE
// $startResults ->JSON LOGMSG // $startResults ->JSON LOGMSG
// $endResults ->JSON LOGMSG // $endResults ->JSON LOGMSG

View File

@ -25,9 +25,21 @@
<% <%
'getMetricsAt' SECTION 'getMetricsAt' SECTION
'endTimestamp' STORE JSON-> 'operation_info' STORE
'labels' STORE JSON-> 'auth_info' STORE
'read_token' STORE
// $operation_info ->JSON LOGMSG
$auth_info 'read' GET 'read_token' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'labels' GET 'labels' STORE
$operation_info 'node' GET 'nodeID' STORE
// 'nodeID' STORE
// 'endTimestamp' STORE
// 'labels' STORE
// 'read_token' STORE
// Raise the max operations for a executing script // Raise the max operations for a executing script
// $read_token AUTHENTICATE // $read_token AUTHENTICATE
@ -37,6 +49,7 @@
'utapi.checkpoint' 'checkpoint_class' STORE 'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.snapshot' 'snapshot_class' STORE 'utapi.snapshot' 'snapshot_class' STORE
'utapi.repair.correction' 'correction_class' STORE 'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.reindex' 'reindex_class' STORE
{} 'snapshots' STORE {} 'snapshots' STORE
@ -248,6 +261,25 @@
%> FOREACH %> FOREACH
%> FOREACH %> FOREACH
'load_reindex' SECTION
// Only load the latest reindex for the current node
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
{
'token' $read_token
'class' $reindex_class
'labels' $filterLabels
'end' $endTimestamp
'count' 1
} FETCH
<% // Handle multiple GTS
VALUES
<% // For each reindex correction
@utapi/decodeRecord
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
$results @util/sumRecord 'results' STORE
%> FOREACH
%> FOREACH
$results // Leave results on the stack $results // Leave results on the stack
%> %>

View File

@ -0,0 +1,3 @@
<%
EVAL @utapi/encodeRecord
%>

730
yarn.lock

File diff suppressed because it is too large Load Diff