Compare commits

..

8 Commits

Author SHA1 Message Date
Taylor McKinnon 438a5fb2f2 fix indentation 2020-09-16 13:40:42 -07:00
Taylor McKinnon a2fc75f72e remove vestigal install stage 2020-09-16 13:35:01 -07:00
Taylor McKinnon cc9420caee f 2020-09-16 13:29:35 -07:00
Taylor McKinnon 53cf86fef9 fix 2020-09-16 13:25:19 -07:00
Taylor McKinnon b2b21e8da5 fix image 2020-09-16 13:11:37 -07:00
Taylor McKinnon 345df46708 fix path 2020-09-16 13:08:22 -07:00
Taylor McKinnon 267448abd5 cache deps 2020-09-16 13:07:02 -07:00
Taylor McKinnon 1f15ab8923 ft(S3C-3286): Add reindex task 2020-09-16 10:14:53 -07:00
33 changed files with 1305 additions and 103 deletions

View File

@ -1,4 +1,3 @@
node_modules/
**/node_modules/
.git
eve/

View File

@ -3,5 +3,13 @@
"rules": {
"no-underscore-dangle": "off",
"implicit-arrow-linebreak" : "off"
}
},
"settings": {
"import/resolver": {
"node": {
"paths": ["/backbeat/node_modules", "node_modules"]
}
}
}
}

14
bin/reindex.js Normal file
View File

@ -0,0 +1,14 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({
task: 'Reindex',
});
const task = new tasks.ReindexTask();
task.setup()
.then(() => logger.info('Starting Reindex daemon'))
.then(() => task.start())
.then(() => logger.info('Reindex started'));

View File

@ -16,15 +16,13 @@ models:
type: kube_pod
path: eve/workers/pod.yml
images:
aggressor: eve/workers/unit_and_feature_tests
aggressor:
context: '.'
dockerfile: eve/workers/unit_and_feature_tests/Dockerfile
warp10:
context: '.'
dockerfile: 'images/warp10/Dockerfile'
vault: eve/workers/mocks/vault
- Install: &install
name: install node modules
command: yarn install --frozen-lockfile
haltOnFailure: True
- Upload: &upload_artifacts
source: /artifacts
urls:
@ -53,7 +51,6 @@ stages:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run static analysis tools on markdown
command: yarn run lint_md
@ -64,7 +61,6 @@ stages:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run unit tests
command: yarn test
@ -72,34 +68,46 @@ stages:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run client tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:client.log"
follow: true
run-server-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run server tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:server.log"
follow: true
run-cron-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run cron tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:cron.log"
follow: true
run-interval-tests:
worker: *workspace
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: run interval tests
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
logfiles:
utapi:
filename: "/artifacts/setup_ft_test:interval.log"
follow: true
run-v2-functional-tests:
worker:
<< : *workspace
@ -107,7 +115,6 @@ stages:
vault: enabled
steps:
- Git: *clone
- ShellCommand: *install
- ShellCommand:
name: Wait for Warp 10
command: |
@ -123,6 +130,7 @@ stages:
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
env:
UTAPI_CACHE_BACKEND: redis
UTAPI_LOG_LEVEL: trace
logfiles:
warp10:
filename: "/artifacts/warp10.log"

View File

@ -3,12 +3,18 @@ FROM buildpack-deps:jessie-curl
#
# Install apt packages needed by utapi and buildbot_worker
#
ENV LANG C.UTF-8
ENV NODE_VERSION 10.22.0
COPY utapi_packages.list buildbot_worker_packages.list /tmp/
ENV PATH=$PATH:/utapi/node_modules/.bin
ENV NODE_PATH=/utapi/node_modules
COPY eve/workers/unit_and_feature_tests/utapi_packages.list eve/workers/unit_and_feature_tests/buildbot_worker_packages.list /tmp/
WORKDIR /utapi
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
&& apt-get update -qq \
@ -19,7 +25,15 @@ RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x6
&& rm -f /etc/supervisor/conf.d/*.conf \
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
#
# Install yarn dependencies
#
COPY package.json yarn.lock /utapi/
RUN yarn cache clean \
&& yarn install --frozen-lockfile \
&& yarn cache clean
#
# Run buildbot-worker on startup through supervisor
#
@ -28,7 +42,7 @@ ARG BUILDBOT_VERSION
RUN pip install buildbot-worker==$BUILDBOT_VERSION
RUN pip3 install requests
RUN pip3 install redis
ADD supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
ADD redis/sentinel.conf /etc/sentinel.conf
ADD eve/workers/unit_and_feature_tests/supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
ADD eve/workers/unit_and_feature_tests/redis/sentinel.conf /etc/sentinel.conf
CMD ["supervisord", "-n"]

View File

@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
SETUP_CMD="start"
fi
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD | tee -a "/artifacts/setup_$2.log" &
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "/artifacts/setup_$2.log" &
bash tests/utils/wait_for_local_port.bash $PORT 40
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"

View File

@ -490,7 +490,7 @@ class UtapiClient {
return done();
}),
// if cursor is 0, it reached end of scan
() => cursor === '0',
cb => cb(null, cursor === '0'),
err => callback(err, keys),
);
}

View File

@ -26,5 +26,10 @@
"ingestionSchedule": "*/5 * * * * *",
"checkpointSchedule": "*/30 * * * * *",
"snapshotSchedule": "* 0 * * * *",
"repairSchedule": "* */5 * * * *"
"repairSchedule": "* */5 * * * *",
"bucketd": [ "localhost:9000" ],
"reindex": {
"enabled": true,
"schedule": "0 0 0 * * 6"
}
}

View File

@ -248,6 +248,8 @@ class Config {
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
};
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
return parsedConfig;
}

View File

@ -32,7 +32,6 @@ const warp10MultiHost = Joi.object({
writeToken: Joi.string(),
});
Joi.array().items(warp10SingleHost);
const schema = Joi.object({
host: Joi.string(),
@ -55,6 +54,11 @@ const schema = Joi.object({
host: Joi.string().hostname(),
port: Joi.number().port(),
}),
reindex: Joi.object({
enabled: Joi.boolean(),
schedule: Joi.string(),
}),
bucketd: Joi.array().items(Joi.string()),
expireMetrics: Joi.boolean(),
expireMetricsTTL: Joi.number(),
cacheBackend: Joi.string().valid('memory', 'redis'),

View File

@ -84,12 +84,14 @@ const constants = {
buckets: 'bck',
},
warp10ValueType: ':m:utapi/event:',
warp10EventType: ':m:utapi/event:',
warp10RecordType: ':m:utapi/record:',
truthy,
shardIngestLagSecs: 30,
checkpointLagSecs: 300,
snapshotLagSecs: 900,
repairLagSecs: 5,
keyVersionSplitter: String.fromCharCode(0),
};
constants.operationToResponse = constants.operations

16
libV2/metadata/client.js Normal file
View File

@ -0,0 +1,16 @@
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
const bucketclient = require('bucketclient');
const config = require('../config');
const { LoggerContext } = require('../utils');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const params = {
bucketdBootstrap: config.bucketd,
https: config.https,
};
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);

116
libV2/metadata/index.js Normal file
View File

@ -0,0 +1,116 @@
/* eslint-disable no-restricted-syntax */
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = require('arsenal').constants;
const metadata = require('./client');
const { LoggerContext, logger } = require('../utils');
const { keyVersionSplitter } = require('../constants');
const moduleLogger = new LoggerContext({
module: 'metadata.client',
});
const PAGE_SIZE = 1000;
function _listingWrapper(bucket, params) {
return new Promise(
(resolve, reject) => metadata.listObject(
bucket,
params,
logger.newRequestLogger(),
(err, res) => {
if (err) {
reject(err);
return;
}
resolve(res);
},
),
);
}
function _listObject(bucket, prefix, hydrateFunc) {
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
let gt;
return {
async* [Symbol.asyncIterator]() {
while (true) {
let res;
try {
// eslint-disable-next-line no-await-in-loop
res = await _listingWrapper(bucket, { ...listingParams, gt });
} catch (error) {
moduleLogger.error('Error during listing', { error });
throw error;
}
for (const item of res) {
yield hydrateFunc ? hydrateFunc(item) : item;
}
if (res.length !== PAGE_SIZE) {
break;
}
gt = res[res.length - 1].key;
}
},
};
}
function listObjects(bucket) {
return _listObject(bucket, '', data => {
const { key, value } = data;
const [name, version] = key.split(keyVersionSplitter);
return {
name,
version,
value: JSON.parse(value),
};
});
}
function listBuckets() {
return _listObject(usersBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
async function listMPUs(bucket) {
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
return _listObject(mpuBucket, '', data => {
const { key, value } = data;
const [account, name] = key.split(mdKeySplitter);
return {
account,
name,
value: JSON.parse(value),
};
});
}
function bucketExists(bucket) {
return new Promise((resolve, reject) => metadata.getBucketAttributes(
bucket,
logger.newRequestLogger(),
err => {
if (err && !err.NoSuchBucket) {
reject(err);
return;
}
resolve(err === null);
},
));
}
module.exports = {
listBuckets,
listObjects,
listMPUs,
bucketExists,
};

View File

@ -0,0 +1,12 @@
const Joi = require('@hapi/joi');
const { buildModel } = require('./Base');
const recordSchema = {
timestamp: Joi.number(),
objectDelta: Joi.number(),
sizeDelta: Joi.number(),
incomingBytes: Joi.number(),
outgoingBytes: Joi.number(),
};
module.exports = buildModel('UtapiRecord', recordSchema);

View File

@ -1,5 +1,6 @@
const BaseModel = require('./Base');
const UtapiMetric = require('./UtapiMetric');
const UtapiRecord = require('./UtapiRecord');
const RequestContext = require('./RequestContext');
const ResponseContainer = require('./ResponseContainer');
@ -8,4 +9,5 @@ module.exports = {
UtapiMetric,
RequestContext,
ResponseContainer,
UtapiRecord,
};

View File

@ -2,6 +2,7 @@ const errors = require('../../../errors');
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
const { convertTimestamp } = require('../../../utils');
const { client: warp10 } = require('../../../warp10');
const config = require('../../../config');
const emptyOperationsResponse = Object.values(operationToResponse)
.reduce((prev, key) => {
@ -32,6 +33,7 @@ async function listMetric(ctx, params) {
start,
end,
labels,
node: config.nodeId,
},
macro: 'utapi/getMetrics',
};

View File

@ -59,7 +59,7 @@ class IngestShardTask extends BaseTask {
return new UtapiMetric(metric);
});
}
const status = await this._warp10.ingest(metricClass, records);
const status = await this._warp10.ingest({ className: metricClass }, records);
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
} else {

177
libV2/tasks/Reindex.js Normal file
View File

@ -0,0 +1,177 @@
/* eslint-disable no-restricted-syntax */
const async = require('async');
const { mpuBucketPrefix } = require('arsenal').constants;
const BaseTask = require('./BaseTask');
const { UtapiRecord } = require('../models');
const config = require('../config');
const metadata = require('../metadata');
const { serviceToWarp10Label, warp10RecordType } = require('../constants');
const { LoggerContext, convertTimestamp } = require('../utils');
const logger = new LoggerContext({
module: 'ReindexTask',
});
class ReindexTask extends BaseTask {
constructor(options) {
super({
warp10: {
requestTimeout: 30000,
connectTimeout: 30000,
},
...options,
});
this._defaultSchedule = config.reindexSchedule;
this._defaultLag = 0;
}
static async _indexBucket(bucket) {
let size = 0;
let count = 0;
let lastMaster = null;
let lastMasterSize = null;
for await (const obj of metadata.listObjects(bucket)) {
if (obj.value.isDeleteMarker) {
// eslint-disable-next-line no-continue
continue;
}
count += 1;
size += obj.value['content-length'];
// If versioned, subtract the size of the master to avoid double counting
if (lastMaster && obj.name === lastMaster) {
logger.debug('Detected versioned key, subtracting master size', { lastMasterSize, key: obj.name });
size -= lastMasterSize;
count -= 1;
lastMaster = null;
lastMasterSize = null;
// Only save master versions
} else if (!obj.version) {
lastMaster = obj.name;
lastMasterSize = obj.value['content-length'];
}
}
return { size, count };
}
static async _indexMpuBucket(bucket) {
if (await metadata.bucketExists(bucket)) {
return ReindexTask._indexBucket(bucket);
}
return { size: 0, count: 0 };
}
async _fetchCurrentMetrics(level, resource) {
const timestamp = convertTimestamp(new Date().getTime());
const options = {
params: {
start: timestamp,
end: timestamp,
node: this._nodeId,
labels: {
[level]: resource,
},
},
macro: 'utapi/getMetrics',
};
const res = await this._warp10.exec(options);
return { timestamp, value: JSON.parse(res.result[0]) };
}
async _updateMetric(level, resource, total) {
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
const objectDelta = total.count - value.numberOfObjects[0];
const sizeDelta = total.size - value.storageUtilized[0];
if (objectDelta !== 0 || sizeDelta !== 0) {
logger.info('discrepancy detected in metrics, writing corrective record',
{ [level]: resource, objectDelta, sizeDelta });
const record = new UtapiRecord({
objectDelta,
sizeDelta,
timestamp,
});
await this._warp10.ingest(
{
className: 'utapi.repair.reindex',
labels: {
[level]: resource,
},
valueType: warp10RecordType,
},
[record],
);
}
}
async _execute() {
logger.debug('reindexing objects');
const accountTotals = {};
const ignoredAccounts = new Set();
await async.eachLimit(metadata.listBuckets(), 5, async bucket => {
logger.trace('starting reindex of bucket', { bucket: bucket.name });
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
let bktTotal;
let mpuTotal;
try {
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
} catch (error) {
logger.error('failed to reindex bucket, ignoring associated account', { error, bucket: bucket.name });
ignoredAccounts.add(bucket.account);
return;
}
const total = {
size: bktTotal.size + mpuTotal.size,
count: bktTotal.count + mpuTotal.count,
};
if (accountTotals[bucket.account]) {
accountTotals[bucket.account].size += total.size;
accountTotals[bucket.account].count += total.count;
} else {
accountTotals[bucket.account] = { ...total };
}
logger.trace('finished indexing bucket', { bucket: bucket.name });
await Promise.all([
this._updateMetric(
serviceToWarp10Label.buckets,
bucket.name,
bktTotal,
),
this._updateMetric(
serviceToWarp10Label.buckets,
mpuBucket,
mpuTotal,
)]);
});
const toUpdate = Object.entries(accountTotals)
.filter(([account]) => !ignoredAccounts.has(account));
await async.eachLimit(toUpdate, 5, async ([account, total]) =>
this._updateMetric(
serviceToWarp10Label.accounts,
account,
total,
));
logger.debug('finished reindexing');
}
}
module.exports = ReindexTask;

View File

@ -3,6 +3,7 @@ const IngestShard = require('./IngestShard');
const CreateCheckpoint = require('./CreateCheckpoint');
const CreateSnapshot = require('./CreateSnapshot');
const RepairTask = require('./Repair');
const ReindexTask = require('./Reindex');
module.exports = {
IngestShard,
@ -10,4 +11,5 @@ module.exports = {
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
};

View File

@ -1,5 +1,6 @@
const { Warp10 } = require('@senx/warp10');
const { eventFieldsToWarp10, warp10ValueType } = require('./constants');
const assert = require('assert');
const { eventFieldsToWarp10, warp10EventType } = require('./constants');
const _config = require('./config');
const { LoggerContext } = require('./utils');
const errors = require('./errors');
@ -44,29 +45,39 @@ class Warp10Client {
// eslint-disable-next-line no-await-in-loop
return await func(client, ...params);
} catch (error) {
moduleLogger.warn('error during warp10 operation, failing over to next host', { error });
moduleLogger.warn('error during warp10 operation, failing over to next host',
{ statusCode: error.statusCode, statusMessage: error.statusMessage });
}
}
moduleLogger.error('no remaining warp10 hosts to try, unable to complete request');
throw errors.InternalError;
}
static _packEvent(event) {
static _packEvent(valueType, event) {
const packed = Object.entries(event.getValue())
.filter(([key]) => eventFieldsToWarp10[key])
.map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`)
.join(' ');
return `${warp10ValueType}{ ${packed} }`;
return `${valueType}{ ${packed} }`;
}
_buildGTSEntry(className, event) {
const labels = this._clients[0].formatLabels({ node: this._nodeId });
const packed = Warp10Client._packEvent(event);
return `${event.timestamp}// ${className}${labels} ${packed}`;
_buildGTSEntry(className, valueType, labels, event) {
const _labels = this._clients[0].formatLabels({ node: this._nodeId, ...labels });
const packed = Warp10Client._packEvent(valueType, event);
return `${event.timestamp}// ${className}${_labels} ${packed}`;
}
async _ingest(warp10, className, events) {
const payload = events.map(ev => this._buildGTSEntry(className, ev));
async _ingest(warp10, metadata, events) {
const { className, valueType, labels } = metadata;
assert.notStrictEqual(className, undefined, 'you must provide a className');
const payload = events.map(
ev => this._buildGTSEntry(
className,
valueType || warp10EventType,
labels || {},
ev,
),
);
const res = await warp10.update(this._writeToken, payload);
return res.count;
}

View File

@ -19,9 +19,10 @@
"dependencies": {
"@hapi/joi": "^17.1.1",
"@senx/warp10": "^1.0.10",
"arsenal": "scality/Arsenal#32c895b",
"async": "^2.0.1",
"arsenal": "scality/Arsenal#aa9c9e5",
"async": "^3.2.0",
"body-parser": "^1.19.0",
"bucketclient": "scality/bucketclient",
"commander": "^5.1.0",
"cron-parser": "^2.15.0",
"express": "^4.17.1",
@ -62,6 +63,7 @@
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
"start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js",
"start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js"
}

View File

@ -201,7 +201,7 @@ describe('UtapiReindex', () => {
shouldLeave = res === value;
return setTimeout(next, 200);
}),
() => shouldLeave, cb);
next => next(null, shouldLeave), cb);
}
function checkMetrics({ resource, expected }, cb) {

View File

@ -63,7 +63,7 @@ async function listMetrics(level, resources, start, end) {
}
async function ingestEvents(events) {
return events.length === await warp10.ingest('utapi.event', events);
return events.length === await warp10.ingest({ className: 'utapi.event' }, events);
}
function opsToResp(operations) {

View File

@ -59,7 +59,7 @@ describe('Test CreateCheckpoint', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(0));
const results = await warp10.fetch({
@ -88,8 +88,8 @@ describe('Test CreateCheckpoint', function () {
accounts,
);
await warp10.ingest('utapi.event', historicalEvents);
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, historicalEvents);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-400));
await checkpointTask._execute(getTs(0));
@ -109,7 +109,7 @@ describe('Test CreateCheckpoint', function () {
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
);
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-100));
let results = await warp10.fetch({

View File

@ -67,7 +67,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-1));
await snapshotTask._execute(getTs(0));
@ -86,7 +86,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 500,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-400));
await checkpointTask._execute(getTs(-300));
await checkpointTask._execute(getTs(-200));
@ -110,7 +110,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 500,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-300));
await snapshotTask._execute(getTs(-250));
@ -132,11 +132,11 @@ describe('Test CreateSnapshot', function () {
const stop = getTs(-50);
const { events, totals } = generateCustomEvents(start, stop, 500, accounts);
await warp10.ingest('utapi.event', events);
await warp10.ingest({ className: 'utapi.event' }, events);
await checkpointTask._execute(getTs(-1));
const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts);
await warp10.ingest('utapi.event', newEvents);
await warp10.ingest({ className: 'utapi.event' }, newEvents);
await checkpointTask._execute(getTs(100));
await snapshotTask._execute(getTs(0));
@ -156,7 +156,7 @@ describe('Test CreateSnapshot', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.repair.event', events);
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(-1));
await snapshotTask._execute(getTs(0));

View File

@ -0,0 +1,89 @@
const assert = require('assert');
const uuid = require('uuid');
const { mpuBucketPrefix } = require('arsenal/lib/constants');
const { Warp10Client } = require('../../../../libV2/warp10');
const { ReindexTask } = require('../../../../libV2/tasks');
const { BucketD, values } = require('../../../utils/mock/');
const { protobuf } = require('../../../utils/v2Data');
const { CANONICAL_ID, BUCKET_NAME } = values;
const bucketCounts = [1, 251];
const bucketRecord = {
ops: {},
sizeD: 1024,
objD: 1,
inB: 0,
outB: 0,
};
const accountRecord = {
ops: {},
sizeD: 2048,
objD: 2,
inB: 0,
outB: 0,
};
// eslint-disable-next-line func-names
describe('Test ReindexTask', function () {
this.timeout(120000);
let prefix;
let warp10;
let reindexTask;
const bucketd = new BucketD(true);
before(() => bucketd.start());
beforeEach(() => {
prefix = uuid.v4();
reindexTask = new ReindexTask({ warp10: { nodeId: prefix } });
reindexTask._program = { nodeId: prefix };
warp10 = new Warp10Client({ nodeId: prefix });
});
afterEach(() => {
bucketd.reset();
});
async function assertResult(labels, value) {
const results = await warp10.fetch({
className: 'utapi.repair.reindex',
labels,
start: 'now',
stop: -100,
});
assert.strictEqual(results.result.length, 1);
assert.notStrictEqual(results.result[0], '');
const series = JSON.parse(results.result[0]);
assert.strictEqual(series.length, 1);
const record = protobuf.decode('Record', series[0].v[0][1]);
assert.deepStrictEqual(record, value);
}
bucketCounts.forEach(count => {
it(`should reindex bucket listing with a length of ${count}`, async () => {
const bucket = `${BUCKET_NAME}-${count}`;
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
bucketd
.setBucketContent({
bucketName: bucket,
contentLength: 1024,
})
.setBucketContent({
bucketName: mpuBucket,
contentLength: 1024,
})
.setBucketCount(count)
.createBuckets();
await reindexTask._execute();
await assertResult({ bck: bucket, node: prefix }, bucketRecord);
await assertResult({ bck: mpuBucket, node: prefix }, bucketRecord);
await assertResult({ acc: CANONICAL_ID, node: prefix }, accountRecord);
});
});
});

View File

@ -60,7 +60,7 @@ describe('Test Repair', function () {
const { events, totals } = generateCustomEvents(start, stop, 100,
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
await warp10.ingest('utapi.repair.event', events);
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(0));
const results = await warp10.fetch({
@ -89,8 +89,8 @@ describe('Test Repair', function () {
accounts,
);
await warp10.ingest('utapi.repair.event', historicalEvents);
await warp10.ingest('utapi.repair.event', events);
await warp10.ingest({ className: 'utapi.repair.event' }, historicalEvents);
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(-400));
await repairTask._execute(getTs(0));
@ -110,7 +110,7 @@ describe('Test Repair', function () {
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
);
await warp10.ingest('utapi.repair.event', events);
await warp10.ingest({ className: 'utapi.repair.event' }, events);
await repairTask._execute(getTs(-100));
let results = await warp10.fetch({

View File

@ -19,13 +19,13 @@ describe('Test Warp Client', () => {
});
it('should ingest records', async () => {
const res = await warp10.ingest(className, testValues);
const res = await warp10.ingest({ className }, testValues);
assert.strictEqual(res, testValues.length);
});
// TODO after the macro encoder is written this will need to be updated
it('should fetch records', async () => {
await warp10.ingest(className, testValues);
await warp10.ingest({ className }, testValues);
const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 });
const parsed = JSON.parse(res.result[0])[0];
assert.strictEqual(parsed.c, className);

View File

@ -7,14 +7,15 @@ const { models, constants } = require('arsenal');
const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
const { ObjectMD } = models;
const app = express();
class BucketD {
constructor() {
constructor(isV2 = false) {
this._server = null;
this._bucketCount = 0;
this._bucketContent = {};
this._buckets = [];
this._isV2 = isV2;
this._app = express();
}
clearBuckets() {
@ -61,14 +62,14 @@ class BucketD {
CommonPrefixes: [],
};
const maxKeys = parseInt(req.query.maxKeys, 10);
if (req.query.marker) {
if (req.query.marker || req.query.gt) {
body.IsTruncated = false;
body.Contents = this._buckets.slice(maxKeys);
} else {
body.IsTruncated = maxKeys < this._bucketCount;
body.Contents = this._buckets.slice(0, maxKeys);
}
return JSON.stringify(body);
return body;
}
_getShadowBucketResponse(bucketName) {
@ -77,7 +78,7 @@ class BucketD {
IsTruncated: false,
Contents: this._bucketContent[bucketName] || [],
};
return JSON.stringify(body);
return body;
}
_getBucketResponse(bucketName) {
@ -86,7 +87,7 @@ class BucketD {
IsTruncated: false,
Contents: this._bucketContent[bucketName] || [],
};
return JSON.stringify(body);
return body;
}
_getShadowBucketOverviewResponse(bucketName) {
@ -99,11 +100,11 @@ class BucketD {
IsTruncated: false,
Uploads: mpus,
};
return JSON.stringify(body);
return body;
}
_initiateRoutes() {
app.param('bucketName', (req, res, next, bucketName) => {
this._app.param('bucketName', (req, res, next, bucketName) => {
/* eslint-disable no-param-reassign */
if (bucketName === constants.usersBucket) {
req.body = this._getUsersBucketResponse(req);
@ -115,26 +116,55 @@ class BucketD {
) {
req.body = this._getBucketResponse(bucketName);
}
// v2 reindex uses `Basic` listing type for everything
if (this._isV2) {
if (req.body && req.body.Contents) {
req.body = req.body.Contents;
}
}
/* eslint-enable no-param-reassign */
next();
});
app.get('/default/bucket/:bucketName', (req, res) => {
res.writeHead(200);
res.write(req.body);
res.end();
this._app.get('/default/attributes/:bucketName', (req, res) => {
const key = req.params.bucketName;
const bucket = this._bucketContent[key];
if (bucket) {
res.status(200).send({
name: key,
owner: CANONICAL_ID,
ownerDisplayName: 'steve',
creationDate: new Date(),
});
return;
}
res.statusMessage = 'DBNotFound';
res.status(404).end();
});
this._app.get('/default/bucket/:bucketName', (req, res) => {
res.status(200).send(req.body);
});
}
start() {
this._initiateRoutes();
const port = 9000;
this._server = http.createServer(app).listen(port);
this._server = http.createServer(this._app).listen(port);
}
end() {
this._server.close();
}
reset() {
this._bucketCount = 0;
this._bucketContent = {};
this._buckets = [];
}
}
module.exports = BucketD;

View File

@ -36,13 +36,14 @@
$operation_info 'labels' GET 'labels' STORE
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
$operation_info 'node' GET 'nodeID' STORE
// 'Fetching metrics for ' $labels ->JSON + LOGMSG
// 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG
$read_token $labels $startTimestamp @utapi/getMetricsAt 'startResults' STORE
$read_token $labels $startTimestamp $nodeID @utapi/getMetricsAt 'startResults' STORE
$read_token $labels $endTimestamp @utapi/getMetricsAt 'endResults' STORE
$read_token $labels $endTimestamp $nodeID @utapi/getMetricsAt 'endResults' STORE
// $startResults ->JSON LOGMSG
// $endResults ->JSON LOGMSG

View File

@ -25,6 +25,7 @@
<%
'getMetricsAt' SECTION
'nodeID' STORE
'endTimestamp' STORE
'labels' STORE
'read_token' STORE
@ -37,6 +38,7 @@
'utapi.checkpoint' 'checkpoint_class' STORE
'utapi.snapshot' 'snapshot_class' STORE
'utapi.repair.correction' 'correction_class' STORE
'utapi.repair.reindex' 'reindex_class' STORE
{} 'snapshots' STORE
@ -248,6 +250,25 @@
%> FOREACH
%> FOREACH
'load_reindex' SECTION
// Only load the latest reindex for the current node
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
{
'token' $read_token
'class' $reindex_class
'labels' $filterLabels
'end' $endTimestamp
'count' 1
} FETCH
<% // Handle multiple GTS
VALUES
<% // For each reindex correction
@utapi/decodeRecord
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
$results @util/sumRecord 'results' STORE
%> FOREACH
%> FOREACH
$results // Leave results on the stack
%>

View File

@ -0,0 +1,3 @@
<%
EVAL @utapi/encodeRecord
%>

730
yarn.lock

File diff suppressed because it is too large Load Diff