Compare commits

..

No commits in common. "84cb2113c2148d0a4eac678970461efd160e92f9" and "17225ad8c7f15449b131ccdefd1102924159c6b7" have entirely different histories.

46 changed files with 129 additions and 1632 deletions

View File

@ -3,13 +3,5 @@
"rules": {
"no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off"
},
"settings": {
"import/resolver": {
"node": {
"paths": ["/backbeat/node_modules", "node_modules"]
}
}
}
}
}

View File

@ -1,14 +0,0 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({
task: 'Reindex',
});
const task = new tasks.ReindexTask();
task.setup()
.then(() => logger.info('Starting Reindex daemon'))
.then(() => task.start())
.then(() => logger.info('Reindex started'));

View File

@ -16,17 +16,14 @@ models:
type: kube_pod
path: eve/workers/pod.yml
images:
aggressor:
context: '.'
dockerfile: eve/workers/unit_and_feature_tests
aggressor: eve/workers/unit_and_feature_tests
warp10:
context: '.'
dockerfile: 'images/warp10/Dockerfile'
vault: eve/workers/mocks/vault
- Install: &install
name: install node modules
# command: yarn install --frozen-lockfile
command: echo '@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ Skipping Install @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@'
command: yarn install --frozen-lockfile
haltOnFailure: True
- Upload: &upload_artifacts
source: /artifacts
@ -79,10 +76,6 @@ stages:
- ShellCommand:
name: run client tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:client.log"
follow: true
run-server-tests:
worker: *workspace
steps:
@ -91,10 +84,6 @@ stages:
- ShellCommand:
name: run server tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:server.log"
follow: true
run-cron-tests:
worker: *workspace
steps:
@ -103,10 +92,6 @@ stages:
- ShellCommand:
name: run cron tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:cron.log"
follow: true
run-interval-tests:
worker: *workspace
steps:
@ -115,10 +100,6 @@ stages:
- ShellCommand:
name: run interval tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:interval.log"
follow: true
run-v2-functional-tests:
worker:
<< : *workspace
@ -142,7 +123,6 @@ stages:
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
logfiles:
warp10:
filename: "/artifacts/warp10.log"

View File

@ -3,18 +3,12 @@ FROM buildpack-deps:jessie-curl
#
# Install apt packages needed by utapi and buildbot_worker
#
ENV LANG C.UTF-8
ENV NODE_VERSION 10.22.0
ENV PATH=$PATH:/utapi/node_modules/.bin
ENV NODE_PATH=/utapi/node_modules
COPY utapi_packages.list buildbot_worker_packages.list /tmp/
WORKDIR /utapi
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
&& apt-get update -qq \
@ -25,17 +19,7 @@ RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x6
&& rm -f /etc/supervisor/conf.d/*.conf \
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
COPY package.json yarn.lock /utapi/
#
# Install yarn dependencies
#
COPY package.json yarn.lock /utapi/
RUN yarn cache clean \
&& yarn install --frozen-lockfile \
&& yarn cache clean
#
# Run buildbot-worker on startup through supervisor
#

View File

@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
SETUP_CMD="start"
fi
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "/artifacts/setup_$2.log" &
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD | tee -a "/artifacts/setup_$2.log" &
bash tests/utils/wait_for_local_port.bash $PORT 40
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"

View File

@ -490,7 +490,7 @@ class UtapiClient {
return done();
}),
// if cursor is 0, it reached end of scan
cb => cb(null, cursor === '0'),
() => cursor === '0',
err => callback(err, keys),
);
}

View File

@ -49,7 +49,6 @@ class MemoryCache {
async addToShard(shard, event) {
const metricKey = schema.getUtapiMetricKey(this._prefix, event);
console.log(`MEM: ${metricKey}`)
this._data[metricKey] = event;
if (this._shards[shard]) {
this._shards[shard].push(metricKey);

View File

@ -64,7 +64,6 @@ class RedisCache {
return logger
.logAsyncError(async () => {
const metricKey = schema.getUtapiMetricKey(this._prefix, metric);
console.log(metricKey)
const shardKey = schema.getShardKey(this._prefix, shard);
const shardMasterKey = schema.getShardMasterKey(this._prefix);
logger.debug('adding metric to shard', { metricKey, shardKey });

View File

@ -2,7 +2,6 @@ const { callbackify } = require('util');
const { Transform } = require('stream');
const uuid = require('uuid');
const needle = require('needle');
const aws4 = require('aws4');
// These modules are added via the `level-mem` package rather than individually
/* eslint-disable import/no-extraneous-dependencies */
@ -10,7 +9,7 @@ const levelup = require('levelup');
const memdown = require('memdown');
const encode = require('encoding-down');
const { UtapiMetric } = require('../models');
const { LoggerContext, asyncOrCallback } = require('../utils');
const { LoggerContext } = require('../utils');
/* eslint-enable import/no-extraneous-dependencies */
const moduleLogger = new LoggerContext({
@ -66,7 +65,6 @@ class Uploader extends Transform {
});
}
}
// const credentials = { accessKeyId, secretAccessKey, token };
class UtapiClient {
constructor(config) {
@ -79,10 +77,6 @@ class UtapiClient {
this._drainTimer = null;
this._drainCanSchedule = true;
this._drainDelay = (config && config.drainDelay) || 30000;
this._credentials = {
accessKeyId: (config && config.accessKeyId) || 'accessKey1',
secretAccessKey: (config && config.secretAccessKey) || 'verySecretKey1',
};
}
async join() {
@ -265,58 +259,6 @@ class UtapiClient {
}
return this._pushMetric(data);
}
async _signedRequest(method, url, options = {}) {
const _url = new URL(url);
const reqOptions = {
method,
hostname: _url.host,
service: 's3',
path: `${_url.pathname}${_url.search}`,
signQuery: false,
body: options.body,
};
const signedOptions = aws4.sign(reqOptions, this._credentials);
const args = [method, url];
if (options.body !== undefined) {
args.push(options.body);
}
args.push({
...options,
headers: signedOptions.headers,
});
return needle(...args);
}
/**
* Get the storageUtilized of a resource
*
* @param {string} level - level of metrics, curretly only 'accounts' is supported
* @param {string} resource - id of the resource
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - return a Promise if no callback is provided, undefined otherwise
*/
getStorage(level, resource, callback) {
if (level !== 'accounts') {
throw new Error('invalid level, only "accounts" is supported');
}
return asyncOrCallback(async () => {
const resp = await this._signedRequest(
'get',
`http://${this._host}:${this._port}/v2/storage/${level}/${resource}`,
);
if (resp.statusCode !== 200) {
throw new Error(`unable to retrieve metrics: ${resp.statusMessage}`);
}
return resp.body;
}, callback);
}
}
module.exports = UtapiClient;

View File

@ -30,10 +30,5 @@
"ingestionSchedule": "*/5 * * * * *",
"checkpointSchedule": "*/30 * * * * *",
"snapshotSchedule": "* 0 * * * *",
"repairSchedule": "* */5 * * * *",
"bucketd": [ "localhost:9000" ],
"reindex": {
"enabled": true,
"schedule": "0 0 0 * * 6"
}
"repairSchedule": "* */5 * * * *"
}

View File

@ -285,8 +285,6 @@ class Config {
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
};
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
return parsedConfig;
}

View File

@ -32,6 +32,7 @@ const warp10MultiHost = Joi.object({
writeToken: Joi.string(),
});
Joi.array().items(warp10SingleHost);
const tlsSchema = Joi.object({
key: Joi.string(),
@ -61,11 +62,6 @@ const schema = Joi.object({
host: Joi.string().hostname(),
port: Joi.number().port(),
}),
reindex: Joi.object({
enabled: Joi.boolean(),
schedule: Joi.string(),
}),
bucketd: Joi.array().items(Joi.string()),
expireMetrics: Joi.boolean(),
expireMetricsTTL: Joi.number(),
cacheBackend: Joi.string().valid('memory', 'redis'),

View File

@ -84,15 +84,13 @@ const constants = {
buckets: 'bck',
},
warp10EventType: ':m:utapi/event:',
warp10RecordType: ':m:utapi/record:',
warp10ValueType: ':m:utapi/event:',
truthy,
shardIngestLagSecs: 30,
checkpointLagSecs: 300,
snapshotLagSecs: 900,
repairLagSecs: 5,
counterBaseValueExpiration: 86400, // 24hrs
keyVersionSplitter: String.fromCharCode(0),
};
constants.operationToResponse = constants.operations

View File

@ -1,16 +0,0 @@
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
const bucketclient = require('bucketclient');
const config = require('../config');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const params = {
bucketdBootstrap: config.bucketd,
https: config.https,
};
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);

View File

@ -1,116 +0,0 @@
/* eslint-disable no-restricted-syntax */
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = require('arsenal/lib/constants');
const metadata = require('./client');
const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const PAGE_SIZE = 1000;
function _listingWrapper(bucket, params) {
return new Promise(
(resolve, reject) => metadata.listObject(
bucket,
params,
logger.newRequestLogger(),
(err, res) => {
if (err) {
reject(err);
return;
}
resolve(res);
},
),
);
}
function _listObject(bucket, prefix, hydrateFunc) {
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
let gt;
return {
async* [Symbol.asyncIterator]() {
while (true) {
let res;
try {
// eslint-disable-next-line no-await-in-loop
res = await _listingWrapper(bucket, { ...listingParams, gt });
} catch (error) {
moduleLogger.error('Error during listing', { error });
throw error;
}
for (const item of res) {
yield hydrateFunc ? hydrateFunc(item) : item;
}
if (res.length === 0 || res.length !== PAGE_SIZE) {
break;
}
gt = res[res.length - 1].key;
}
},
};
}
function listObjects(bucket) {
return _listObject(bucket, '', data => {
const { key, value } = data;
const [name, version] = key.split(keyVersionSplitter);
return {
name,
version,
value: JSON.parse(value),
};
});
}
function listBuckets() {
return _listObject(usersBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
async function listMPUs(bucket) {
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
return _listObject(mpuBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
function bucketExists(bucket) {
return new Promise((resolve, reject) => metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(),
err => {
if (err && !err.NoSuchBucket) {
reject(err);
return;
}
resolve(err === null);
},
));
}
module.exports = {
listBuckets,
listObjects,
listMPUs,
bucketExists,
};

View File

@ -34,7 +34,7 @@ class RequestContext extends RequestContextModel {
const tag = request.swagger.operation['x-router-controller'];
const { operationId } = request.swagger.operation;
request.logger.logger.addDefaultFields({
request.logger.addDefaultFields({
tag,
operationId,
service: 'utapi',

View File

@ -1,12 +0,0 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const recordSchema = {
timestamp: Joi.number(),
objectDelta: Joi.number(),
sizeDelta: Joi.number(),
incomingBytes: Joi.number(),
outgoingBytes: Joi.number(),
};
module.exports = buildModel('UtapiRecord', recordSchema);

View File

@ -1,6 +1,5 @@
const BaseModel = require('./Base');
const UtapiMetric = require('./UtapiMetric');
const UtapiRecord = require('./UtapiRecord');
const RequestContext = require('./RequestContext');
const ResponseContainer = require('./ResponseContainer');
@ -9,5 +8,4 @@ module.exports = {
UtapiMetric,
RequestContext,
ResponseContainer,
UtapiRecord,
};

View File

@ -1,49 +0,0 @@
const errors = require('../../../errors');
const { serviceToWarp10Label } = require('../../../constants');
const { client: warp10 } = require('../../../warp10');
const { client: cache } = require('../../../cache');
const { now } = require('../../../utils');
const config = require('../../../config');
async function getStorage(ctx, params) {
const { level, resource } = params;
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
let storageUtilized;
if (base !== null) {
storageUtilized = counter + base;
} else {
const labelName = serviceToWarp10Label[params.level];
const labels = { [labelName]: resource };
const options = {
params: {
end: now(),
labels,
node: config.nodeId,
},
macro: 'utapi/getMetricsAt',
};
const res = await warp10.exec(options);
if (res.result.length === 0) {
ctx.logger.error('unable to retrieve metrics', { level, resource });
throw errors.InternalError;
}
const { sizeD: currentSize } = res.result[0];
await cache.updateAccountCounterBase(resource, currentSize);
storageUtilized = currentSize;
}
ctx.results.statusCode = 200;
ctx.results.body = {
storageUtilized,
resource,
level,
};
}
module.exports = getStorage;

View File

@ -2,7 +2,6 @@ const errors = require('../../../errors');
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
const { convertTimestamp } = require('../../../utils');
const { client: warp10 } = require('../../../warp10');
const config = require('../../../config');
const emptyOperationsResponse = Object.values(operationToResponse)
.reduce((prev, key) => {
@ -33,7 +32,6 @@ async function listMetric(ctx, params) {
start,
end,
labels,
node: config.nodeId,
},
macro: 'utapi/getMetrics',
};

View File

@ -126,20 +126,19 @@ class APIController {
static async _callOperation(handler, request, response, params) {
try {
await handler(request.ctx, params);
} catch (error) {
console.log(error)
request.logger.error('error during operation', { error });
throw error;
} catch (err) {
request.logger.error('error during operation', { err });
throw err;
}
request.logger.debug('writing operation result');
try {
await APIController._writeResult(request.ctx.results, response);
} catch (error) {
} catch (err) {
request.logger.error(
'error while writing operation result',
{ error },
{ err },
);
throw error;
throw err;
}
}

View File

@ -85,35 +85,15 @@ async function initializeOasTools(spec, app) {
async function authV4Middleware(request, response, params) {
const authHeader = request.headers.authorization;
if (!authHeader || !authHeader.startsWith('AWS4-')) {
request.logger.error('missing auth header for v4 auth');
request.log.error('missing auth header for v4 auth');
throw errors.InvalidRequest.customizeDescription('Must use Auth V4 for this request.');
}
let action;
let level;
let requestedResources = [];
if (params.level) {
// Can't destructure here
// eslint-disable-next-line prefer-destructuring
level = params.level;
requestedResources = [params.resource];
action = 'ListMetrics';
} else {
requestedResources = params.body[params.resource];
level = params.resource;
action = params.Action.value;
}
if (requestedResources.length === 0) {
throw errors.InvalidRequest.customizeDescription('You must specify at lest one resource');
}
let passed;
let authorizedResources;
try {
[passed, authorizedResources] = await authenticateRequest(request, action, level, requestedResources);
[passed, authorizedResources] = await authenticateRequest(request, params);
} catch (error) {
request.logger.error('error during authentication', { error });
throw errors.InternalError;
@ -124,8 +104,8 @@ async function authV4Middleware(request, response, params) {
throw errors.AccessDenied;
}
if (params.level === undefined && authorizedResources !== undefined) {
params.body[params.resource] = authorizedResources;
if (authorizedResources !== undefined) {
params.body[params.resource.value] = authorizedResources;
}
}

View File

@ -59,7 +59,7 @@ class IngestShardTask extends BaseTask {
return new UtapiMetric(metric);
});
}
const status = await this._warp10.ingest({ className: metricClass }, records);
const status = await this._warp10.ingest(metricClass, records);
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
} else {

View File

@ -1,177 +0,0 @@
/* eslint-disable no-restricted-syntax */
const async = require('async');
const { mpuBucketPrefix } = require('arsenal/lib/constants');
const BaseTask = require('./BaseTask');
const { UtapiRecord } = require('../models');
const config = require('../config');
const metadata = require('../metadata');
const { serviceToWarp10Label, warp10RecordType } = require('../constants');
const { LoggerContext, convertTimestamp } = require('../utils');
const logger = new LoggerContext({
module: 'ReindexTask',
});
class ReindexTask extends BaseTask {
constructor(options) {
super({
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0;
}
static async _indexBucket(bucket) {
let size = 0;
let count = 0;
let lastMaster = null;
let lastMasterSize = null;
for await (const obj of metadata.listObjects(bucket)) {
if (obj.value.isDeleteMarker) {
// eslint-disable-next-line no-continue
continue;
}
count += 1;
size += obj.value['content-length'];
// If versioned, subtract the size of the master to avoid double counting
if (lastMaster && obj.name === lastMaster) {
logger.debug('Detected versioned key, subtracting master size', { lastMasterSize, key: obj.name });
size -= lastMasterSize;
count -= 1;
lastMaster = null;
lastMasterSize = null;
// Only save master versions
} else if (!obj.version) {
lastMaster = obj.name;
lastMasterSize = obj.value['content-length'];
}
}
return { size, count };
}
static async _indexMpuBucket(bucket) {
if (await metadata.bucketExists(bucket)) {
return ReindexTask._indexBucket(bucket);
}
return { size: 0, count: 0 };
}
async _fetchCurrentMetrics(level, resource) {
const timestamp = convertTimestamp(new Date().getTime());
const options = {
params: {
start: timestamp,
end: timestamp,
node: this._nodeId,
labels: {
[level]: resource,
},
},
macro: 'utapi/getMetrics',
};
const res = await this._warp10.exec(options);
return { timestamp, value: JSON.parse(res.result[0]) };
}
async _updateMetric(level, resource, total) {
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
const objectDelta = total.count - value.numberOfObjects[0];
const sizeDelta = total.size - value.storageUtilized[0];
if (objectDelta !== 0 || sizeDelta !== 0) {
logger.info('discrepancy detected in metrics, writing corrective record',
{ [level]: resource, objectDelta, sizeDelta });
const record = new UtapiRecord({
objectDelta,
sizeDelta,
timestamp,
});
await this._warp10.ingest(
{
className: 'utapi.repair.reindex',
labels: {
[level]: resource,
},
valueType: warp10RecordType,
},
[record],
);
}
}
async _execute() {
logger.debug('reindexing objects');
const accountTotals = {};
const ignoredAccounts = new Set();
await async.eachLimit(metadata.listBuckets(), 5, async bucket => {
logger.trace('starting reindex of bucket', { bucket: bucket.name });
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
let bktTotal;
let mpuTotal;
try {
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) {
logger.error('failed to reindex bucket, ignoring associated account', { error, bucket: bucket.name });
ignoredAccounts.add(bucket.account);
return;
}
const total = {
size: bktTotal.size + mpuTotal.size,
count: bktTotal.count + mpuTotal.count,
};
if (accountTotals[bucket.account]) {
accountTotals[bucket.account].size += total.size;
accountTotals[bucket.account].count += total.count;
} else {
accountTotals[bucket.account] = { ...total };
}
logger.trace('finished indexing bucket', { bucket: bucket.name });
await Promise.all([
this._updateMetric(
serviceToWarp10Label.buckets,
bucket.name,
bktTotal,
),
this._updateMetric(
serviceToWarp10Label.buckets,
mpuBucket,
mpuTotal,
)]);
});
const toUpdate = Object.entries(accountTotals)
.filter(([account]) => !ignoredAccounts.has(account));
await async.eachLimit(toUpdate, 5, async ([account, total]) =>
this._updateMetric(
serviceToWarp10Label.accounts,
account,
total,
));
logger.debug('finished reindexing');
}
}
module.exports = ReindexTask;

View File

@ -3,7 +3,6 @@ const IngestShard = require('./IngestShard');
const CreateCheckpoint = require('./CreateCheckpoint');
const CreateSnapshot = require('./CreateSnapshot');
const RepairTask = require('./Repair');
const ReindexTask = require('./Reindex');
module.exports = {
IngestShard,
@ -11,5 +10,4 @@ module.exports = {
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
};

View File

@ -1,20 +0,0 @@
const { callbackify } = require('util');
/**
* Convenience function to handle "if no callback then return a promise" pattern
*
* @param {Function} asyncFunc - asyncFunction to call
* @param {Function|undefined} callback - optional callback
* @returns {Promise|undefined} - returns a Promise if no callback is passed
*/
function asyncOrCallback(asyncFunc, callback) {
if (typeof callback === 'function') {
callbackify(asyncFunc)(callback);
return undefined;
}
return asyncFunc();
}
module.exports = {
asyncOrCallback,
};

View File

@ -1,11 +1,9 @@
const log = require('./log');
const shard = require('./shard');
const timestamp = require('./timestamp');
const func = require('./func');
module.exports = {
...log,
...shard,
...timestamp,
...func,
};

View File

@ -11,67 +11,51 @@ werelogs.configure(loggerConfig);
const rootLogger = new werelogs.Logger('Utapi');
class LoggerContext {
constructor(defaults, logger = null) {
constructor(defaults) {
this._defaults = defaults;
this.logger = logger !== null ? logger : rootLogger;
}
get defaults() {
return this._defaults || {};
}
static expandError(data) {
if (data && data.error) {
return { ...data, errmsg: data.error.message, stack: data.error.stack };
}
return data;
}
_collectData(data) {
return { ...this.defaults, ...LoggerContext.expandError(data) };
}
with(extraDefaults) {
return new LoggerContext({ ...this.defaults, ...extraDefaults }, this.logger);
}
withLogger(logger) {
return new LoggerContext({ ...this.defaults }, logger);
return new LoggerContext({ ...this.defaults, ...extraDefaults });
}
info(msg, data = {}) {
return this.logger.info(msg, this._collectData(data));
return rootLogger.info(msg, { ...this.defaults, ...data });
}
debug(msg, data = {}) {
return this.logger.debug(msg, this._collectData(data));
return rootLogger.debug(msg, { ...this.defaults, ...data });
}
trace(msg, data = {}) {
return this.logger.trace(msg, this._collectData(data));
return rootLogger.trace(msg, { ...this.defaults, ...data });
}
warn(msg, data = {}) {
return this.logger.warn(msg, this._collectData(data));
return rootLogger.warn(msg, { ...this.defaults, ...data });
}
error(msg, data = {}) {
return this.logger.error(msg, this._collectData(data));
let _data = data;
if (data && data.error) {
_data = { ...data, errmsg: data.error.message, stack: data.error.stack };
}
return rootLogger.error(msg, { ...this.defaults, ..._data });
}
fatal(msg, data = {}) {
return this.logger.fatal(msg, this._collectData(data));
}
end(msg, data = {}) {
return this.logger.end(msg, this._collectData(data));
return rootLogger.fatal(msg, { ...this.defaults, ...data });
}
async logAsyncError(func, msg, data = {}) {
try {
return await func();
} catch (error) {
this.error(msg, { ...data, error });
this.error(msg, { error, ...data });
throw error;
}
}
@ -99,7 +83,7 @@ function buildRequestLogger(req) {
};
reqLogger.addDefaultFields(defaultInfo);
return new LoggerContext({}, reqLogger);
return reqLogger;
}
module.exports = {

View File

@ -32,18 +32,7 @@ class InterpolatedClock {
}
}
/**
* Returns the current time as
* the number of microseconds since the epoch
*
* @returns {Number} - current timestamp
*/
function now() {
return new Date().getTime() * 1000;
}
module.exports = {
convertTimestamp,
InterpolatedClock,
now,
};

View File

@ -85,19 +85,19 @@ class Vault {
const handler = new Vault(config);
auth.setHandler(handler);
async function authenticateRequest(request, action, level, resources) {
async function authenticateRequest(request, params) {
const policyContext = new policies.RequestContext(
request.headers,
level,
resources,
params.resource,
params.body[params.resource],
request.ip,
request.ctx.encrypted,
action,
params.Action.value,
'utapi',
);
return new Promise((resolve, reject) => {
auth.server.doAuth(request, request.logger.logger, (err, res) => {
auth.server.doAuth(request, request.logger, (err, res) => {
if (err && (err.InvalidAccessKeyId || err.AccessDenied)) {
resolve([false]);
return;

View File

@ -1,6 +1,5 @@
const { Warp10 } = require('@senx/warp10');
const assert = require('assert');
const { eventFieldsToWarp10, warp10EventType } = require('./constants');
const { eventFieldsToWarp10, warp10ValueType } = require('./constants');
const _config = require('./config');
const { LoggerContext } = require('./utils');
const errors = require('./errors');
@ -45,39 +44,29 @@ class Warp10Client {
// eslint-disable-next-line no-await-in-loop
return await func(client, ...params);
} catch (error) {
moduleLogger.warn('error during warp10 operation, failing over to next host',
{ statusCode: error.statusCode, statusMessage: error.statusMessage, error });
moduleLogger.warn('error during warp10 operation, failing over to next host', { error });
}
}
moduleLogger.error('no remaining warp10 hosts to try, unable to complete request');
throw errors.InternalError;
}
static _packEvent(valueType, event) {
static _packEvent(event) {
const packed = Object.entries(event.getValue())
.filter(([key]) => eventFieldsToWarp10[key])
.map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`)
.join(' ');
return `${valueType}{ ${packed} }`;
return `${warp10ValueType}{ ${packed} }`;
}
_buildGTSEntry(className, valueType, labels, event) {
const _labels = this._clients[0].formatLabels({ node: this._nodeId, ...labels });
const packed = Warp10Client._packEvent(valueType, event);
return `${event.timestamp}// ${className}${_labels} ${packed}`;
_buildGTSEntry(className, event) {
const labels = this._clients[0].formatLabels({ node: this._nodeId });
const packed = Warp10Client._packEvent(event);
return `${event.timestamp}// ${className}${labels} ${packed}`;
}
async _ingest(warp10, metadata, events) {
const { className, valueType, labels } = metadata;
assert.notStrictEqual(className, undefined, 'you must provide a className');
const payload = events.map(
ev => this._buildGTSEntry(
className,
valueType || warp10EventType,
labels || {},
ev,
),
);
async _ingest(warp10, className, events) {
const payload = events.map(ev => this._buildGTSEntry(className, ev));
const res = await warp10.update(this._writeToken, payload);
return res.count;
}

View File

@ -84,31 +84,7 @@ components:
type: string
message:
type: string
utapi-get-storage-v1:
description: storageUtilized for a single resource
content:
application/json:
schema:
type: object
required:
- storageUtilized
- resource
- level
properties:
storageUtilized:
type: integer
resource:
type: string
level:
type: string
parameters:
level:
in: path
name: level
required: true
schema:
type: string
enum: [ 'accounts' ]
resource:
in: path
name: resource
@ -149,20 +125,7 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/utapi-metric-v1'
/v2/storage/{level}/{resource}:
parameters:
- $ref: '#/components/parameters/resource'
- $ref: '#/components/parameters/level'
get:
x-router-controller: metrics
x-authv4: true
description: Get current storage utilized
operationId: getStorage
responses:
default:
$ref: '#/components/responses/json-error'
200:
$ref: '#/components/responses/utapi-get-storage-v1'
/{resource}:
parameters:
- $ref: '#/components/parameters/resource'

View File

@ -19,10 +19,9 @@
"dependencies": {
"@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.10",
"arsenal": "scality/Arsenal#aa9c9e5",
"async": "^3.2.0",
"arsenal": "scality/Arsenal#32c895b",
"async": "^2.0.1",
"body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient",
"commander": "^5.1.0",
"cron-parser": "^2.15.0",
"express": "^4.17.1",
@ -63,7 +62,6 @@
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
"start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js",
"start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js"
}

View File

@ -201,7 +201,7 @@ describe('UtapiReindex', () => {
shouldLeave = res === value;
return setTimeout(next, 200);
}),
next => next(null, shouldLeave), cb);
() => shouldLeave, cb);
}
function checkMetrics({ resource, expected }, cb) {

View File

@ -63,7 +63,7 @@ async function listMetrics(level, resources, start, end) {
}
async function ingestEvents(events) {
return events.length === await warp10.ingest({ className: 'utapi.event' }, events);
return events.length === await warp10.ingest('utapi.event', events);
}
function opsToResp(operations) {

View File

@ -59,7 +59,7 @@ describe('Test CreateCheckpoint', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(0));
const results = await warp10.fetch({
@ -88,8 +88,8 @@ describe('Test CreateCheckpoint', function () {
accounts,
);
await warp10.ingest({ className: 'utapi.event' }, historicalEvents);
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', historicalEvents);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(-400));
await checkpointTask._execute(getTs(0));
@ -109,7 +109,7 @@ describe('Test CreateCheckpoint', function () {
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
);
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(-100));
let results = await warp10.fetch({

View File

@ -67,7 +67,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(-1));
await snapshotTask._execute(getTs(0));
@ -86,7 +86,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 500,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(-400));
await checkpointTask._execute(getTs(-300));
await checkpointTask._execute(getTs(-200));
@ -110,7 +110,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 500,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(-300));
await snapshotTask._execute(getTs(-250));
@ -132,11 +132,11 @@ describe('Test CreateSnapshot', function () {
const stop = getTs(-50);
const { events, totals } = generateCustomEvents(start, stop, 500, accounts);
await warp10.ingest({ className: 'utapi.event' }, events);
await warp10.ingest('utapi.event', events);
await checkpointTask._execute(getTs(-1));
const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts);
await warp10.ingest({ className: 'utapi.event' }, newEvents);
await warp10.ingest('utapi.event', newEvents);
await checkpointTask._execute(getTs(100));
await snapshotTask._execute(getTs(0));
@ -156,7 +156,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await warp10.ingest('utapi.repair.event', events);
await repairTask._execute(getTs(-1));
await snapshotTask._execute(getTs(0));

View File

@ -1,89 +0,0 @@
const assert = require('assert');
const uuid = require('uuid');
const { mpuBucketPrefix } = require('arsenal/lib/constants');
const { Warp10Client } = require('../../../../libV2/warp10');
const { ReindexTask } = require('../../../../libV2/tasks');
const { BucketD, values } = require('../../../utils/mock/');
const { protobuf } = require('../../../utils/v2Data');
const { CANONICAL_ID, BUCKET_NAME } = values;
const bucketCounts = [1, 1001];
const bucketRecord = {
ops: {},
sizeD: 1024,
objD: 1,
inB: 0,
outB: 0,
};
const accountRecord = {
ops: {},
sizeD: 2048,
objD: 2,
inB: 0,
outB: 0,
};
// eslint-disable-next-line func-names
describe('Test ReindexTask', function () {
this.timeout(1200000);
let prefix;
let warp10;
let reindexTask;
const bucketd = new BucketD(true);
before(() => bucketd.start());
beforeEach(() => {
prefix = uuid.v4();
reindexTask = new ReindexTask({ warp10: { nodeId: prefix } });
reindexTask._program = { nodeId: prefix };
warp10 = new Warp10Client({ nodeId: prefix });
});
afterEach(() => {
bucketd.reset();
});
async function assertResult(labels, value) {
const results = await warp10.fetch({
className: 'utapi.repair.reindex',
labels,
start: 'now',
stop: -100,
});
assert.strictEqual(results.result.length, 1);
assert.notStrictEqual(results.result[0], '');
const series = JSON.parse(results.result[0]);
assert.strictEqual(series.length, 1);
const record = protobuf.decode('Record', series[0].v[0][1]);
assert.deepStrictEqual(record, value);
}
bucketCounts.forEach(count => {
it(`should reindex bucket listing with a length of ${count}`, async () => {
const bucket = `${BUCKET_NAME}-${count}`;
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
bucketd
.setBucketContent({
bucketName: bucket,
contentLength: 1024,
})
.setBucketContent({
bucketName: mpuBucket,
contentLength: 1024,
})
.setBucketCount(count)
.createBuckets();
await reindexTask._execute();
await assertResult({ bck: bucket, node: prefix }, bucketRecord);
await assertResult({ bck: mpuBucket, node: prefix }, bucketRecord);
await assertResult({ acc: CANONICAL_ID, node: prefix }, accountRecord);
});
});
});

View File

@ -60,7 +60,7 @@ describe('Test Repair', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await warp10.ingest('utapi.repair.event', events);
await repairTask._execute(getTs(0));
const results = await warp10.fetch({
@ -89,8 +89,8 @@ describe('Test Repair', function () {
accounts,
);
await warp10.ingest({ className: 'utapi.repair.event' }, historicalEvents);
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await warp10.ingest('utapi.repair.event', historicalEvents);
await warp10.ingest('utapi.repair.event', events);
await repairTask._execute(getTs(-400));
await repairTask._execute(getTs(0));
@ -110,7 +110,7 @@ describe('Test Repair', function () {
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
);
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await warp10.ingest('utapi.repair.event', events);
await repairTask._execute(getTs(-100));
let results = await warp10.fetch({

View File

@ -1,49 +1,24 @@
const assert = require('assert');
const async = require('async');
const sinon = require('sinon');
const uuid = require('uuid');
const { generateFakeEvents } = require('../../utils/v2Data');
const UtapiClient = require('../../../libV2/client');
const config = require('../../../libV2/config');
const { CacheClient, backends: cacheBackends } = require('../../../libV2/cache');
const { IngestShard } = require('../../../libV2/tasks');
const { generateCustomEvents } = require('../../utils/v2Data');
const getClient = prefix => new CacheClient({
cacheBackend: new cacheBackends.RedisCache(
config.cache,
prefix,
),
counterBackend: new cacheBackends.RedisCache(
config.cache,
prefix,
),
});
const { events, totals } = generateCustomEvents(1, 50, 50, {
[uuid.v4()]: { [uuid.v4()]: [uuid.v4()] },
});
const events = generateFakeEvents(1, 50, 50);
// Unskip after server side support is added
// eslint-disable-next-line func-names
describe('Test UtapiClient', function () {
describe.skip('Test UtapiClient', function () {
this.timeout(10000);
let client;
let sandbox;
let events;
let total;
beforeEach(() => {
sandbox = sinon.createSandbox();
client = new UtapiClient({
drainDelay: 5000,
});
const { events: _events, totals: _total } = generateCustomEvents(1, 50, 50, {
[uuid.v4()]: { [uuid.v4()]: [uuid.v4()] },
});
events = _events;
totals = _totals;
});
afterEach(() => {
@ -77,35 +52,5 @@ describe('Test UtapiClient', function () {
}, 6000);
});
});
describe.only('Test getStorage', () => {
let prefix;
let cacheClient;
let ingestTask;
beforeEach(async () => {
prefix = uuid.v4();
cacheClient = getClient(prefix);
await cacheClient.connect();
ingestTask = new IngestShard();
ingestTask._cache._cacheBackend._prefix = prefix;
console.log(ingestTask._cache._cacheBackend)
ingestTask._program = { lag: 0 };
await ingestTask._cache.connect();
console.log(ingestTask._cache._cacheBackend)
});
it('should get the current value from warp10 if cache is empty', async () => {
await Promise.all(events.map(ev => cacheClient.pushMetric(ev)));
await ingestTask.execute();
await async.eachOf(totals.accounts, async (total, acc) => {
const resp = await client.getStorage('accounts', acc);
console.log(total);
console.log(resp);
});
});
});
});

View File

@ -19,13 +19,13 @@ describe('Test Warp Client', () => {
});
it('should ingest records', async () => {
const res = await warp10.ingest({ className }, testValues);
const res = await warp10.ingest(className, testValues);
assert.strictEqual(res, testValues.length);
});
// TODO after the macro encoder is written this will need to be updated
it('should fetch records', async () => {
await warp10.ingest({ className }, testValues);
await warp10.ingest(className, testValues);
const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 });
const parsed = JSON.parse(res.result[0])[0];
assert.strictEqual(parsed.c, className);

View File

@ -7,15 +7,14 @@ const { models, constants } = require('arsenal');
const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
const { ObjectMD } = models;
const app = express();
class BucketD {
constructor(isV2 = false) {
constructor() {
this._server = null;
this._bucketCount = 0;
this._bucketContent = {};
this._buckets = [];
this._isV2 = isV2;
this._app = express();
}
clearBuckets() {
@ -62,14 +61,14 @@ class BucketD {
CommonPrefixes: [],
};
const maxKeys = parseInt(req.query.maxKeys, 10);
if (req.query.marker || req.query.gt) {
if (req.query.marker) {
body.IsTruncated = false;
body.Contents = this._buckets.slice(maxKeys);
} else {
body.IsTruncated = maxKeys < this._bucketCount;
body.Contents = this._buckets.slice(0, maxKeys);
}
return body;
return JSON.stringify(body);
}
_getShadowBucketResponse(bucketName) {
@ -78,7 +77,7 @@ class BucketD {
IsTruncated: false,
Contents: this._bucketContent[bucketName] || [],
};
return body;
return JSON.stringify(body);
}
_getBucketResponse(bucketName) {
@ -87,7 +86,7 @@ class BucketD {
IsTruncated: false,
Contents: this._bucketContent[bucketName] || [],
};
return body;
return JSON.stringify(body);
}
_getShadowBucketOverviewResponse(bucketName) {
@ -100,11 +99,11 @@ class BucketD {
IsTruncated: false,
Uploads: mpus,
};
return body;
return JSON.stringify(body);
}
_initiateRoutes() {
this._app.param('bucketName', (req, res, next, bucketName) => {
app.param('bucketName', (req, res, next, bucketName) => {
/* eslint-disable no-param-reassign */
if (bucketName === constants.usersBucket) {
req.body = this._getUsersBucketResponse(req);
@ -116,55 +115,26 @@ class BucketD {
) {
req.body = this._getBucketResponse(bucketName);
}
// v2 reindex uses `Basic` listing type for everything
if (this._isV2) {
if (req.body && req.body.Contents) {
req.body = req.body.Contents;
}
}
/* eslint-enable no-param-reassign */
next();
});
this._app.get('/default/attributes/:bucketName', (req, res) => {
const key = req.params.bucketName;
const bucket = this._bucketContent[key];
if (bucket) {
res.status(200).send({
name: key,
owner: CANONICAL_ID,
ownerDisplayName: 'steve',
creationDate: new Date(),
});
return;
}
res.statusMessage = 'DBNotFound';
res.status(404).end();
});
this._app.get('/default/bucket/:bucketName', (req, res) => {
res.status(200).send(req.body);
app.get('/default/bucket/:bucketName', (req, res) => {
res.writeHead(200);
res.write(req.body);
res.end();
});
}
start() {
this._initiateRoutes();
const port = 9000;
this._server = http.createServer(this._app).listen(port);
this._server = http.createServer(app).listen(port);
}
end() {
this._server.close();
}
reset() {
this._bucketCount = 0;
this._bucketContent = {};
this._buckets = [];
}
}
module.exports = BucketD;

View File

@ -36,18 +36,13 @@
$operation_info 'labels' GET 'labels' STORE
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'node' GET 'nodeID' STORE
// 'Fetching metrics for ' $labels ->JSON + LOGMSG
// 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG
{ 'labels' $labels 'node' $nodeID } 'opInfo' STORE
$read_token $labels $startTimestamp @utapi/getMetricsAt 'startResults' STORE
$auth_info ->JSON $opInfo UNMAP 'end' $startTimestamp } ->JSON @utapi/getMetricsAt 'startResults' STORE
$auth_info ->JSON $opInfo UNMAP 'end' $endTimestamp } ->JSON @utapi/getMetricsAt 'endResults' STORE
// $read_token $labels $startTimestamp $nodeID @utapi/getMetricsAt 'startResults' STORE
// $read_token $labels $endTimestamp $nodeID @utapi/getMetricsAt 'endResults' STORE
$read_token $labels $endTimestamp @utapi/getMetricsAt 'endResults' STORE
// $startResults ->JSON LOGMSG
// $endResults ->JSON LOGMSG

View File

@ -25,21 +25,9 @@
<%
'getMetricsAt' SECTION
JSON-> 'operation_info' STORE
JSON-> 'auth_info' STORE
// $operation_info ->JSON LOGMSG
$auth_info 'read' GET 'read_token' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'labels' GET 'labels' STORE
$operation_info 'node' GET 'nodeID' STORE
// 'nodeID' STORE
// 'endTimestamp' STORE
// 'labels' STORE
// 'read_token' STORE
'endTimestamp' STORE
'labels' STORE
'read_token' STORE
// Raise the max operations for a executing script
// $read_token AUTHENTICATE
@ -49,7 +37,6 @@
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.reindex' 'reindex_class' STORE
{} 'snapshots' STORE
@ -261,25 +248,6 @@
%> FOREACH
%> FOREACH
'load_reindex' SECTION
// Only load the latest reindex for the current node
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
{
'token' $read_token
'class' $reindex_class
'labels' $filterLabels
'end' $endTimestamp
'count' 1
} FETCH
<% // Handle multiple GTS
VALUES
<% // For each reindex correction
@utapi/decodeRecord
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
$results @util/sumRecord 'results' STORE
%> FOREACH
%> FOREACH
$results // Leave results on the stack
%>

View File

@ -1,3 +0,0 @@
<%
EVAL @utapi/encodeRecord
%>

730
yarn.lock

File diff suppressed because it is too large Load Diff