Compare commits
8 Commits
633d296c38
...
438a5fb2f2
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 438a5fb2f2 | |
Taylor McKinnon | a2fc75f72e | |
Taylor McKinnon | cc9420caee | |
Taylor McKinnon | 53cf86fef9 | |
Taylor McKinnon | b2b21e8da5 | |
Taylor McKinnon | 345df46708 | |
Taylor McKinnon | 267448abd5 | |
Taylor McKinnon | 1f15ab8923 |
|
@ -1,4 +1,3 @@
|
||||||
node_modules/
|
node_modules/
|
||||||
**/node_modules/
|
**/node_modules/
|
||||||
.git
|
.git
|
||||||
eve/
|
|
||||||
|
|
10
.eslintrc
10
.eslintrc
|
@ -3,5 +3,13 @@
|
||||||
"rules": {
|
"rules": {
|
||||||
"no-underscore-dangle": "off",
|
"no-underscore-dangle": "off",
|
||||||
"implicit-arrow-linebreak" : "off"
|
"implicit-arrow-linebreak" : "off"
|
||||||
}
|
},
|
||||||
|
"settings": {
|
||||||
|
"import/resolver": {
|
||||||
|
"node": {
|
||||||
|
"paths": ["/backbeat/node_modules", "node_modules"]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
const { tasks } = require('..');
|
||||||
|
const { LoggerContext } = require('../libV2/utils');
|
||||||
|
|
||||||
|
const logger = new LoggerContext({
|
||||||
|
task: 'Reindex',
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
const task = new tasks.ReindexTask();
|
||||||
|
|
||||||
|
task.setup()
|
||||||
|
.then(() => logger.info('Starting Reindex daemon'))
|
||||||
|
.then(() => task.start())
|
||||||
|
.then(() => logger.info('Reindex started'));
|
32
eve/main.yml
32
eve/main.yml
|
@ -16,15 +16,13 @@ models:
|
||||||
type: kube_pod
|
type: kube_pod
|
||||||
path: eve/workers/pod.yml
|
path: eve/workers/pod.yml
|
||||||
images:
|
images:
|
||||||
aggressor: eve/workers/unit_and_feature_tests
|
aggressor:
|
||||||
|
context: '.'
|
||||||
|
dockerfile: eve/workers/unit_and_feature_tests/Dockerfile
|
||||||
warp10:
|
warp10:
|
||||||
context: '.'
|
context: '.'
|
||||||
dockerfile: 'images/warp10/Dockerfile'
|
dockerfile: 'images/warp10/Dockerfile'
|
||||||
vault: eve/workers/mocks/vault
|
vault: eve/workers/mocks/vault
|
||||||
- Install: &install
|
|
||||||
name: install node modules
|
|
||||||
command: yarn install --frozen-lockfile
|
|
||||||
haltOnFailure: True
|
|
||||||
- Upload: &upload_artifacts
|
- Upload: &upload_artifacts
|
||||||
source: /artifacts
|
source: /artifacts
|
||||||
urls:
|
urls:
|
||||||
|
@ -53,7 +51,6 @@ stages:
|
||||||
worker: *workspace
|
worker: *workspace
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: run static analysis tools on markdown
|
name: run static analysis tools on markdown
|
||||||
command: yarn run lint_md
|
command: yarn run lint_md
|
||||||
|
@ -64,7 +61,6 @@ stages:
|
||||||
worker: *workspace
|
worker: *workspace
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: run unit tests
|
name: run unit tests
|
||||||
command: yarn test
|
command: yarn test
|
||||||
|
@ -72,34 +68,46 @@ stages:
|
||||||
worker: *workspace
|
worker: *workspace
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: run client tests
|
name: run client tests
|
||||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
|
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:client
|
||||||
|
logfiles:
|
||||||
|
utapi:
|
||||||
|
filename: "/artifacts/setup_ft_test:client.log"
|
||||||
|
follow: true
|
||||||
run-server-tests:
|
run-server-tests:
|
||||||
worker: *workspace
|
worker: *workspace
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: run server tests
|
name: run server tests
|
||||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
|
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:server
|
||||||
|
logfiles:
|
||||||
|
utapi:
|
||||||
|
filename: "/artifacts/setup_ft_test:server.log"
|
||||||
|
follow: true
|
||||||
run-cron-tests:
|
run-cron-tests:
|
||||||
worker: *workspace
|
worker: *workspace
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: run cron tests
|
name: run cron tests
|
||||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
|
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash false ft_test:cron
|
||||||
|
logfiles:
|
||||||
|
utapi:
|
||||||
|
filename: "/artifacts/setup_ft_test:cron.log"
|
||||||
|
follow: true
|
||||||
run-interval-tests:
|
run-interval-tests:
|
||||||
worker: *workspace
|
worker: *workspace
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: run interval tests
|
name: run interval tests
|
||||||
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
|
command: bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:interval
|
||||||
|
logfiles:
|
||||||
|
utapi:
|
||||||
|
filename: "/artifacts/setup_ft_test:interval.log"
|
||||||
|
follow: true
|
||||||
run-v2-functional-tests:
|
run-v2-functional-tests:
|
||||||
worker:
|
worker:
|
||||||
<< : *workspace
|
<< : *workspace
|
||||||
|
@ -107,7 +115,6 @@ stages:
|
||||||
vault: enabled
|
vault: enabled
|
||||||
steps:
|
steps:
|
||||||
- Git: *clone
|
- Git: *clone
|
||||||
- ShellCommand: *install
|
|
||||||
- ShellCommand:
|
- ShellCommand:
|
||||||
name: Wait for Warp 10
|
name: Wait for Warp 10
|
||||||
command: |
|
command: |
|
||||||
|
@ -123,6 +130,7 @@ stages:
|
||||||
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
|
command: SETUP_CMD="run start_v2:server" bash ./eve/workers/unit_and_feature_tests/run_ft_tests.bash true ft_test:v2
|
||||||
env:
|
env:
|
||||||
UTAPI_CACHE_BACKEND: redis
|
UTAPI_CACHE_BACKEND: redis
|
||||||
|
UTAPI_LOG_LEVEL: trace
|
||||||
logfiles:
|
logfiles:
|
||||||
warp10:
|
warp10:
|
||||||
filename: "/artifacts/warp10.log"
|
filename: "/artifacts/warp10.log"
|
||||||
|
|
|
@ -3,12 +3,18 @@ FROM buildpack-deps:jessie-curl
|
||||||
#
|
#
|
||||||
# Install apt packages needed by utapi and buildbot_worker
|
# Install apt packages needed by utapi and buildbot_worker
|
||||||
#
|
#
|
||||||
|
|
||||||
ENV LANG C.UTF-8
|
ENV LANG C.UTF-8
|
||||||
ENV NODE_VERSION 10.22.0
|
ENV NODE_VERSION 10.22.0
|
||||||
COPY utapi_packages.list buildbot_worker_packages.list /tmp/
|
ENV PATH=$PATH:/utapi/node_modules/.bin
|
||||||
|
ENV NODE_PATH=/utapi/node_modules
|
||||||
|
|
||||||
|
COPY eve/workers/unit_and_feature_tests/utapi_packages.list eve/workers/unit_and_feature_tests/buildbot_worker_packages.list /tmp/
|
||||||
|
|
||||||
|
WORKDIR /utapi
|
||||||
|
|
||||||
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
|
RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x64.tar.gz \
|
||||||
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
|
&& tar -xf node-v${NODE_VERSION}-linux-x64.tar.gz --directory /usr/local --strip-components 1 \
|
||||||
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
|
&& curl -sS http://dl.yarnpkg.com/debian/pubkey.gpg | apt-key add - \
|
||||||
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
|
&& echo "deb http://dl.yarnpkg.com/debian/ stable main" | tee /etc/apt/sources.list.d/yarn.list \
|
||||||
&& apt-get update -qq \
|
&& apt-get update -qq \
|
||||||
|
@ -19,7 +25,15 @@ RUN wget https://nodejs.org/dist/v${NODE_VERSION}/node-v${NODE_VERSION}-linux-x6
|
||||||
&& rm -f /etc/supervisor/conf.d/*.conf \
|
&& rm -f /etc/supervisor/conf.d/*.conf \
|
||||||
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
|
&& rm -f node-v${NODE_VERSION}-linux-x64.tar.gz
|
||||||
|
|
||||||
|
#
|
||||||
|
# Install yarn dependencies
|
||||||
|
#
|
||||||
|
|
||||||
|
COPY package.json yarn.lock /utapi/
|
||||||
|
|
||||||
|
RUN yarn cache clean \
|
||||||
|
&& yarn install --frozen-lockfile \
|
||||||
|
&& yarn cache clean
|
||||||
#
|
#
|
||||||
# Run buildbot-worker on startup through supervisor
|
# Run buildbot-worker on startup through supervisor
|
||||||
#
|
#
|
||||||
|
@ -28,7 +42,7 @@ ARG BUILDBOT_VERSION
|
||||||
RUN pip install buildbot-worker==$BUILDBOT_VERSION
|
RUN pip install buildbot-worker==$BUILDBOT_VERSION
|
||||||
RUN pip3 install requests
|
RUN pip3 install requests
|
||||||
RUN pip3 install redis
|
RUN pip3 install redis
|
||||||
ADD supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
|
ADD eve/workers/unit_and_feature_tests/supervisor/buildbot_worker.conf /etc/supervisor/conf.d/
|
||||||
ADD redis/sentinel.conf /etc/sentinel.conf
|
ADD eve/workers/unit_and_feature_tests/redis/sentinel.conf /etc/sentinel.conf
|
||||||
|
|
||||||
CMD ["supervisord", "-n"]
|
CMD ["supervisord", "-n"]
|
||||||
|
|
|
@ -17,6 +17,6 @@ if [ -z "$SETUP_CMD" ]; then
|
||||||
SETUP_CMD="start"
|
SETUP_CMD="start"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD | tee -a "/artifacts/setup_$2.log" &
|
UTAPI_INTERVAL_TEST_MODE=$1 npm $SETUP_CMD 2>&1 | tee -a "/artifacts/setup_$2.log" &
|
||||||
bash tests/utils/wait_for_local_port.bash $PORT 40
|
bash tests/utils/wait_for_local_port.bash $PORT 40
|
||||||
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"
|
UTAPI_INTERVAL_TEST_MODE=$1 npm run $2 | tee -a "/artifacts/test_$2.log"
|
||||||
|
|
|
@ -490,7 +490,7 @@ class UtapiClient {
|
||||||
return done();
|
return done();
|
||||||
}),
|
}),
|
||||||
// if cursor is 0, it reached end of scan
|
// if cursor is 0, it reached end of scan
|
||||||
() => cursor === '0',
|
cb => cb(null, cursor === '0'),
|
||||||
err => callback(err, keys),
|
err => callback(err, keys),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,5 +26,10 @@
|
||||||
"ingestionSchedule": "*/5 * * * * *",
|
"ingestionSchedule": "*/5 * * * * *",
|
||||||
"checkpointSchedule": "*/30 * * * * *",
|
"checkpointSchedule": "*/30 * * * * *",
|
||||||
"snapshotSchedule": "* 0 * * * *",
|
"snapshotSchedule": "* 0 * * * *",
|
||||||
"repairSchedule": "* */5 * * * *"
|
"repairSchedule": "* */5 * * * *",
|
||||||
|
"bucketd": [ "localhost:9000" ],
|
||||||
|
"reindex": {
|
||||||
|
"enabled": true,
|
||||||
|
"schedule": "0 0 0 * * 6"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -248,6 +248,8 @@ class Config {
|
||||||
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
|
port: _loadFromEnv('VAULT_PORT', config.vaultd.port),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
parsedConfig.bucketd = _loadFromEnv('BUCKETD_BOOTSTRAP', config.bucketd, _typeCasts.serverList);
|
||||||
|
|
||||||
return parsedConfig;
|
return parsedConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,6 @@ const warp10MultiHost = Joi.object({
|
||||||
writeToken: Joi.string(),
|
writeToken: Joi.string(),
|
||||||
});
|
});
|
||||||
|
|
||||||
Joi.array().items(warp10SingleHost);
|
|
||||||
|
|
||||||
const schema = Joi.object({
|
const schema = Joi.object({
|
||||||
host: Joi.string(),
|
host: Joi.string(),
|
||||||
|
@ -55,6 +54,11 @@ const schema = Joi.object({
|
||||||
host: Joi.string().hostname(),
|
host: Joi.string().hostname(),
|
||||||
port: Joi.number().port(),
|
port: Joi.number().port(),
|
||||||
}),
|
}),
|
||||||
|
reindex: Joi.object({
|
||||||
|
enabled: Joi.boolean(),
|
||||||
|
schedule: Joi.string(),
|
||||||
|
}),
|
||||||
|
bucketd: Joi.array().items(Joi.string()),
|
||||||
expireMetrics: Joi.boolean(),
|
expireMetrics: Joi.boolean(),
|
||||||
expireMetricsTTL: Joi.number(),
|
expireMetricsTTL: Joi.number(),
|
||||||
cacheBackend: Joi.string().valid('memory', 'redis'),
|
cacheBackend: Joi.string().valid('memory', 'redis'),
|
||||||
|
|
|
@ -84,12 +84,14 @@ const constants = {
|
||||||
buckets: 'bck',
|
buckets: 'bck',
|
||||||
},
|
},
|
||||||
|
|
||||||
warp10ValueType: ':m:utapi/event:',
|
warp10EventType: ':m:utapi/event:',
|
||||||
|
warp10RecordType: ':m:utapi/record:',
|
||||||
truthy,
|
truthy,
|
||||||
shardIngestLagSecs: 30,
|
shardIngestLagSecs: 30,
|
||||||
checkpointLagSecs: 300,
|
checkpointLagSecs: 300,
|
||||||
snapshotLagSecs: 900,
|
snapshotLagSecs: 900,
|
||||||
repairLagSecs: 5,
|
repairLagSecs: 5,
|
||||||
|
keyVersionSplitter: String.fromCharCode(0),
|
||||||
};
|
};
|
||||||
|
|
||||||
constants.operationToResponse = constants.operations
|
constants.operationToResponse = constants.operations
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
const BucketClientInterface = require('arsenal/lib/storage/metadata/bucketclient/BucketClientInterface');
|
||||||
|
const bucketclient = require('bucketclient');
|
||||||
|
|
||||||
|
const config = require('../config');
|
||||||
|
const { LoggerContext } = require('../utils');
|
||||||
|
|
||||||
|
const moduleLogger = new LoggerContext({
|
||||||
|
module: 'metadata.client',
|
||||||
|
});
|
||||||
|
|
||||||
|
const params = {
|
||||||
|
bucketdBootstrap: config.bucketd,
|
||||||
|
https: config.https,
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = new BucketClientInterface(params, bucketclient, moduleLogger);
|
|
@ -0,0 +1,116 @@
|
||||||
|
/* eslint-disable no-restricted-syntax */
|
||||||
|
const { usersBucket, splitter: mdKeySplitter, mpuBucketPrefix } = require('arsenal').constants;
|
||||||
|
const metadata = require('./client');
|
||||||
|
const { LoggerContext, logger } = require('../utils');
|
||||||
|
const { keyVersionSplitter } = require('../constants');
|
||||||
|
|
||||||
|
const moduleLogger = new LoggerContext({
|
||||||
|
module: 'metadata.client',
|
||||||
|
});
|
||||||
|
|
||||||
|
const PAGE_SIZE = 1000;
|
||||||
|
|
||||||
|
function _listingWrapper(bucket, params) {
|
||||||
|
return new Promise(
|
||||||
|
(resolve, reject) => metadata.listObject(
|
||||||
|
bucket,
|
||||||
|
params,
|
||||||
|
logger.newRequestLogger(),
|
||||||
|
(err, res) => {
|
||||||
|
if (err) {
|
||||||
|
reject(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resolve(res);
|
||||||
|
},
|
||||||
|
),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
function _listObject(bucket, prefix, hydrateFunc) {
|
||||||
|
const listingParams = { prefix, maxKeys: PAGE_SIZE, listingType: 'Basic' };
|
||||||
|
let gt;
|
||||||
|
return {
|
||||||
|
async* [Symbol.asyncIterator]() {
|
||||||
|
while (true) {
|
||||||
|
let res;
|
||||||
|
|
||||||
|
try {
|
||||||
|
// eslint-disable-next-line no-await-in-loop
|
||||||
|
res = await _listingWrapper(bucket, { ...listingParams, gt });
|
||||||
|
} catch (error) {
|
||||||
|
moduleLogger.error('Error during listing', { error });
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const item of res) {
|
||||||
|
yield hydrateFunc ? hydrateFunc(item) : item;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res.length !== PAGE_SIZE) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
gt = res[res.length - 1].key;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function listObjects(bucket) {
|
||||||
|
return _listObject(bucket, '', data => {
|
||||||
|
const { key, value } = data;
|
||||||
|
const [name, version] = key.split(keyVersionSplitter);
|
||||||
|
return {
|
||||||
|
name,
|
||||||
|
version,
|
||||||
|
value: JSON.parse(value),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function listBuckets() {
|
||||||
|
return _listObject(usersBucket, '', data => {
|
||||||
|
const { key, value } = data;
|
||||||
|
const [account, name] = key.split(mdKeySplitter);
|
||||||
|
return {
|
||||||
|
account,
|
||||||
|
name,
|
||||||
|
value: JSON.parse(value),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
async function listMPUs(bucket) {
|
||||||
|
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
|
||||||
|
return _listObject(mpuBucket, '', data => {
|
||||||
|
const { key, value } = data;
|
||||||
|
const [account, name] = key.split(mdKeySplitter);
|
||||||
|
return {
|
||||||
|
account,
|
||||||
|
name,
|
||||||
|
value: JSON.parse(value),
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function bucketExists(bucket) {
|
||||||
|
return new Promise((resolve, reject) => metadata.getBucketAttributes(
|
||||||
|
bucket,
|
||||||
|
logger.newRequestLogger(),
|
||||||
|
err => {
|
||||||
|
if (err && !err.NoSuchBucket) {
|
||||||
|
reject(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
resolve(err === null);
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
listBuckets,
|
||||||
|
listObjects,
|
||||||
|
listMPUs,
|
||||||
|
bucketExists,
|
||||||
|
};
|
|
@ -0,0 +1,12 @@
|
||||||
|
const Joi = require('@hapi/joi');
|
||||||
|
const { buildModel } = require('./Base');
|
||||||
|
|
||||||
|
const recordSchema = {
|
||||||
|
timestamp: Joi.number(),
|
||||||
|
objectDelta: Joi.number(),
|
||||||
|
sizeDelta: Joi.number(),
|
||||||
|
incomingBytes: Joi.number(),
|
||||||
|
outgoingBytes: Joi.number(),
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = buildModel('UtapiRecord', recordSchema);
|
|
@ -1,5 +1,6 @@
|
||||||
const BaseModel = require('./Base');
|
const BaseModel = require('./Base');
|
||||||
const UtapiMetric = require('./UtapiMetric');
|
const UtapiMetric = require('./UtapiMetric');
|
||||||
|
const UtapiRecord = require('./UtapiRecord');
|
||||||
const RequestContext = require('./RequestContext');
|
const RequestContext = require('./RequestContext');
|
||||||
const ResponseContainer = require('./ResponseContainer');
|
const ResponseContainer = require('./ResponseContainer');
|
||||||
|
|
||||||
|
@ -8,4 +9,5 @@ module.exports = {
|
||||||
UtapiMetric,
|
UtapiMetric,
|
||||||
RequestContext,
|
RequestContext,
|
||||||
ResponseContainer,
|
ResponseContainer,
|
||||||
|
UtapiRecord,
|
||||||
};
|
};
|
||||||
|
|
|
@ -2,6 +2,7 @@ const errors = require('../../../errors');
|
||||||
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
|
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
|
||||||
const { convertTimestamp } = require('../../../utils');
|
const { convertTimestamp } = require('../../../utils');
|
||||||
const { client: warp10 } = require('../../../warp10');
|
const { client: warp10 } = require('../../../warp10');
|
||||||
|
const config = require('../../../config');
|
||||||
|
|
||||||
const emptyOperationsResponse = Object.values(operationToResponse)
|
const emptyOperationsResponse = Object.values(operationToResponse)
|
||||||
.reduce((prev, key) => {
|
.reduce((prev, key) => {
|
||||||
|
@ -32,6 +33,7 @@ async function listMetric(ctx, params) {
|
||||||
start,
|
start,
|
||||||
end,
|
end,
|
||||||
labels,
|
labels,
|
||||||
|
node: config.nodeId,
|
||||||
},
|
},
|
||||||
macro: 'utapi/getMetrics',
|
macro: 'utapi/getMetrics',
|
||||||
};
|
};
|
||||||
|
|
|
@ -59,7 +59,7 @@ class IngestShardTask extends BaseTask {
|
||||||
return new UtapiMetric(metric);
|
return new UtapiMetric(metric);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
const status = await this._warp10.ingest(metricClass, records);
|
const status = await this._warp10.ingest({ className: metricClass }, records);
|
||||||
assert.strictEqual(status, records.length);
|
assert.strictEqual(status, records.length);
|
||||||
await this._cache.deleteShard(shard);
|
await this._cache.deleteShard(shard);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
/* eslint-disable no-restricted-syntax */
|
||||||
|
const async = require('async');
|
||||||
|
const { mpuBucketPrefix } = require('arsenal').constants;
|
||||||
|
const BaseTask = require('./BaseTask');
|
||||||
|
const { UtapiRecord } = require('../models');
|
||||||
|
const config = require('../config');
|
||||||
|
const metadata = require('../metadata');
|
||||||
|
const { serviceToWarp10Label, warp10RecordType } = require('../constants');
|
||||||
|
|
||||||
|
const { LoggerContext, convertTimestamp } = require('../utils');
|
||||||
|
|
||||||
|
const logger = new LoggerContext({
|
||||||
|
module: 'ReindexTask',
|
||||||
|
});
|
||||||
|
|
||||||
|
class ReindexTask extends BaseTask {
|
||||||
|
constructor(options) {
|
||||||
|
super({
|
||||||
|
warp10: {
|
||||||
|
requestTimeout: 30000,
|
||||||
|
connectTimeout: 30000,
|
||||||
|
},
|
||||||
|
...options,
|
||||||
|
});
|
||||||
|
this._defaultSchedule = config.reindexSchedule;
|
||||||
|
this._defaultLag = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static async _indexBucket(bucket) {
|
||||||
|
let size = 0;
|
||||||
|
let count = 0;
|
||||||
|
let lastMaster = null;
|
||||||
|
let lastMasterSize = null;
|
||||||
|
|
||||||
|
for await (const obj of metadata.listObjects(bucket)) {
|
||||||
|
if (obj.value.isDeleteMarker) {
|
||||||
|
// eslint-disable-next-line no-continue
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
count += 1;
|
||||||
|
size += obj.value['content-length'];
|
||||||
|
|
||||||
|
// If versioned, subtract the size of the master to avoid double counting
|
||||||
|
if (lastMaster && obj.name === lastMaster) {
|
||||||
|
logger.debug('Detected versioned key, subtracting master size', { lastMasterSize, key: obj.name });
|
||||||
|
size -= lastMasterSize;
|
||||||
|
count -= 1;
|
||||||
|
lastMaster = null;
|
||||||
|
lastMasterSize = null;
|
||||||
|
// Only save master versions
|
||||||
|
} else if (!obj.version) {
|
||||||
|
lastMaster = obj.name;
|
||||||
|
lastMasterSize = obj.value['content-length'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return { size, count };
|
||||||
|
}
|
||||||
|
|
||||||
|
static async _indexMpuBucket(bucket) {
|
||||||
|
if (await metadata.bucketExists(bucket)) {
|
||||||
|
return ReindexTask._indexBucket(bucket);
|
||||||
|
}
|
||||||
|
|
||||||
|
return { size: 0, count: 0 };
|
||||||
|
}
|
||||||
|
|
||||||
|
async _fetchCurrentMetrics(level, resource) {
|
||||||
|
const timestamp = convertTimestamp(new Date().getTime());
|
||||||
|
const options = {
|
||||||
|
params: {
|
||||||
|
start: timestamp,
|
||||||
|
end: timestamp,
|
||||||
|
node: this._nodeId,
|
||||||
|
labels: {
|
||||||
|
[level]: resource,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
macro: 'utapi/getMetrics',
|
||||||
|
};
|
||||||
|
const res = await this._warp10.exec(options);
|
||||||
|
return { timestamp, value: JSON.parse(res.result[0]) };
|
||||||
|
}
|
||||||
|
|
||||||
|
async _updateMetric(level, resource, total) {
|
||||||
|
const { timestamp, value } = await this._fetchCurrentMetrics(level, resource);
|
||||||
|
|
||||||
|
const objectDelta = total.count - value.numberOfObjects[0];
|
||||||
|
const sizeDelta = total.size - value.storageUtilized[0];
|
||||||
|
|
||||||
|
if (objectDelta !== 0 || sizeDelta !== 0) {
|
||||||
|
logger.info('discrepancy detected in metrics, writing corrective record',
|
||||||
|
{ [level]: resource, objectDelta, sizeDelta });
|
||||||
|
|
||||||
|
const record = new UtapiRecord({
|
||||||
|
objectDelta,
|
||||||
|
sizeDelta,
|
||||||
|
timestamp,
|
||||||
|
});
|
||||||
|
|
||||||
|
await this._warp10.ingest(
|
||||||
|
{
|
||||||
|
className: 'utapi.repair.reindex',
|
||||||
|
labels: {
|
||||||
|
[level]: resource,
|
||||||
|
},
|
||||||
|
valueType: warp10RecordType,
|
||||||
|
},
|
||||||
|
[record],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async _execute() {
|
||||||
|
logger.debug('reindexing objects');
|
||||||
|
|
||||||
|
const accountTotals = {};
|
||||||
|
const ignoredAccounts = new Set();
|
||||||
|
|
||||||
|
await async.eachLimit(metadata.listBuckets(), 5, async bucket => {
|
||||||
|
logger.trace('starting reindex of bucket', { bucket: bucket.name });
|
||||||
|
|
||||||
|
const mpuBucket = `${mpuBucketPrefix}${bucket.name}`;
|
||||||
|
let bktTotal;
|
||||||
|
let mpuTotal;
|
||||||
|
|
||||||
|
try {
|
||||||
|
bktTotal = await async.retryable(ReindexTask._indexBucket)(bucket.name);
|
||||||
|
mpuTotal = await async.retryable(ReindexTask._indexMpuBucket)(mpuBucket);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error('failed to reindex bucket, ignoring associated account', { error, bucket: bucket.name });
|
||||||
|
ignoredAccounts.add(bucket.account);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const total = {
|
||||||
|
size: bktTotal.size + mpuTotal.size,
|
||||||
|
count: bktTotal.count + mpuTotal.count,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (accountTotals[bucket.account]) {
|
||||||
|
accountTotals[bucket.account].size += total.size;
|
||||||
|
accountTotals[bucket.account].count += total.count;
|
||||||
|
} else {
|
||||||
|
accountTotals[bucket.account] = { ...total };
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.trace('finished indexing bucket', { bucket: bucket.name });
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
this._updateMetric(
|
||||||
|
serviceToWarp10Label.buckets,
|
||||||
|
bucket.name,
|
||||||
|
bktTotal,
|
||||||
|
),
|
||||||
|
this._updateMetric(
|
||||||
|
serviceToWarp10Label.buckets,
|
||||||
|
mpuBucket,
|
||||||
|
mpuTotal,
|
||||||
|
)]);
|
||||||
|
});
|
||||||
|
|
||||||
|
const toUpdate = Object.entries(accountTotals)
|
||||||
|
.filter(([account]) => !ignoredAccounts.has(account));
|
||||||
|
|
||||||
|
await async.eachLimit(toUpdate, 5, async ([account, total]) =>
|
||||||
|
this._updateMetric(
|
||||||
|
serviceToWarp10Label.accounts,
|
||||||
|
account,
|
||||||
|
total,
|
||||||
|
));
|
||||||
|
|
||||||
|
logger.debug('finished reindexing');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = ReindexTask;
|
|
@ -3,6 +3,7 @@ const IngestShard = require('./IngestShard');
|
||||||
const CreateCheckpoint = require('./CreateCheckpoint');
|
const CreateCheckpoint = require('./CreateCheckpoint');
|
||||||
const CreateSnapshot = require('./CreateSnapshot');
|
const CreateSnapshot = require('./CreateSnapshot');
|
||||||
const RepairTask = require('./Repair');
|
const RepairTask = require('./Repair');
|
||||||
|
const ReindexTask = require('./Reindex');
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
IngestShard,
|
IngestShard,
|
||||||
|
@ -10,4 +11,5 @@ module.exports = {
|
||||||
CreateCheckpoint,
|
CreateCheckpoint,
|
||||||
CreateSnapshot,
|
CreateSnapshot,
|
||||||
RepairTask,
|
RepairTask,
|
||||||
|
ReindexTask,
|
||||||
};
|
};
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const { Warp10 } = require('@senx/warp10');
|
const { Warp10 } = require('@senx/warp10');
|
||||||
const { eventFieldsToWarp10, warp10ValueType } = require('./constants');
|
const assert = require('assert');
|
||||||
|
const { eventFieldsToWarp10, warp10EventType } = require('./constants');
|
||||||
const _config = require('./config');
|
const _config = require('./config');
|
||||||
const { LoggerContext } = require('./utils');
|
const { LoggerContext } = require('./utils');
|
||||||
const errors = require('./errors');
|
const errors = require('./errors');
|
||||||
|
@ -44,29 +45,39 @@ class Warp10Client {
|
||||||
// eslint-disable-next-line no-await-in-loop
|
// eslint-disable-next-line no-await-in-loop
|
||||||
return await func(client, ...params);
|
return await func(client, ...params);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
moduleLogger.warn('error during warp10 operation, failing over to next host', { error });
|
moduleLogger.warn('error during warp10 operation, failing over to next host',
|
||||||
|
{ statusCode: error.statusCode, statusMessage: error.statusMessage });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
moduleLogger.error('no remaining warp10 hosts to try, unable to complete request');
|
moduleLogger.error('no remaining warp10 hosts to try, unable to complete request');
|
||||||
throw errors.InternalError;
|
throw errors.InternalError;
|
||||||
}
|
}
|
||||||
|
|
||||||
static _packEvent(event) {
|
static _packEvent(valueType, event) {
|
||||||
const packed = Object.entries(event.getValue())
|
const packed = Object.entries(event.getValue())
|
||||||
.filter(([key]) => eventFieldsToWarp10[key])
|
.filter(([key]) => eventFieldsToWarp10[key])
|
||||||
.map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`)
|
.map(([key, value]) => `'${eventFieldsToWarp10[key]}' ${_stringify(value)}`)
|
||||||
.join(' ');
|
.join(' ');
|
||||||
return `${warp10ValueType}{ ${packed} }`;
|
return `${valueType}{ ${packed} }`;
|
||||||
}
|
}
|
||||||
|
|
||||||
_buildGTSEntry(className, event) {
|
_buildGTSEntry(className, valueType, labels, event) {
|
||||||
const labels = this._clients[0].formatLabels({ node: this._nodeId });
|
const _labels = this._clients[0].formatLabels({ node: this._nodeId, ...labels });
|
||||||
const packed = Warp10Client._packEvent(event);
|
const packed = Warp10Client._packEvent(valueType, event);
|
||||||
return `${event.timestamp}// ${className}${labels} ${packed}`;
|
return `${event.timestamp}// ${className}${_labels} ${packed}`;
|
||||||
}
|
}
|
||||||
|
|
||||||
async _ingest(warp10, className, events) {
|
async _ingest(warp10, metadata, events) {
|
||||||
const payload = events.map(ev => this._buildGTSEntry(className, ev));
|
const { className, valueType, labels } = metadata;
|
||||||
|
assert.notStrictEqual(className, undefined, 'you must provide a className');
|
||||||
|
const payload = events.map(
|
||||||
|
ev => this._buildGTSEntry(
|
||||||
|
className,
|
||||||
|
valueType || warp10EventType,
|
||||||
|
labels || {},
|
||||||
|
ev,
|
||||||
|
),
|
||||||
|
);
|
||||||
const res = await warp10.update(this._writeToken, payload);
|
const res = await warp10.update(this._writeToken, payload);
|
||||||
return res.count;
|
return res.count;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,9 +19,10 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@hapi/joi": "^17.1.1",
|
"@hapi/joi": "^17.1.1",
|
||||||
"@senx/warp10": "^1.0.10",
|
"@senx/warp10": "^1.0.10",
|
||||||
"arsenal": "scality/Arsenal#32c895b",
|
"arsenal": "scality/Arsenal#aa9c9e5",
|
||||||
"async": "^2.0.1",
|
"async": "^3.2.0",
|
||||||
"body-parser": "^1.19.0",
|
"body-parser": "^1.19.0",
|
||||||
|
"bucketclient": "scality/bucketclient",
|
||||||
"commander": "^5.1.0",
|
"commander": "^5.1.0",
|
||||||
"cron-parser": "^2.15.0",
|
"cron-parser": "^2.15.0",
|
||||||
"express": "^4.17.1",
|
"express": "^4.17.1",
|
||||||
|
@ -62,6 +63,7 @@
|
||||||
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
|
"start_v2:task:checkpoint": "ENABLE_UTAPI_V2=1 node bin/createCheckpoint.js",
|
||||||
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
|
"start_v2:task:snapshot": "ENABLE_UTAPI_V2=1 node bin/createSnapshot.js",
|
||||||
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
|
"start_v2:task:repair": "ENABLE_UTAPI_V2=1 node bin/repair.js",
|
||||||
|
"start_v2:task:reindex": "ENABLE_UTAPI_V2=1 node bin/reindex.js",
|
||||||
"start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js",
|
"start_v2:server": "ENABLE_UTAPI_V2=1 node bin/server.js",
|
||||||
"start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js"
|
"start_v2:server:dev": "UTAPI_DEV_MODE=t ENABLE_UTAPI_V2=t yarn nodemon --watch './**/*.js' --watch './**/*.json' --watch './**/*.yaml' --exec node bin/server.js"
|
||||||
}
|
}
|
||||||
|
|
|
@ -201,7 +201,7 @@ describe('UtapiReindex', () => {
|
||||||
shouldLeave = res === value;
|
shouldLeave = res === value;
|
||||||
return setTimeout(next, 200);
|
return setTimeout(next, 200);
|
||||||
}),
|
}),
|
||||||
() => shouldLeave, cb);
|
next => next(null, shouldLeave), cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
function checkMetrics({ resource, expected }, cb) {
|
function checkMetrics({ resource, expected }, cb) {
|
||||||
|
|
|
@ -63,7 +63,7 @@ async function listMetrics(level, resources, start, end) {
|
||||||
}
|
}
|
||||||
|
|
||||||
async function ingestEvents(events) {
|
async function ingestEvents(events) {
|
||||||
return events.length === await warp10.ingest('utapi.event', events);
|
return events.length === await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
}
|
}
|
||||||
|
|
||||||
function opsToResp(operations) {
|
function opsToResp(operations) {
|
||||||
|
|
|
@ -59,7 +59,7 @@ describe('Test CreateCheckpoint', function () {
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(0));
|
await checkpointTask._execute(getTs(0));
|
||||||
|
|
||||||
const results = await warp10.fetch({
|
const results = await warp10.fetch({
|
||||||
|
@ -88,8 +88,8 @@ describe('Test CreateCheckpoint', function () {
|
||||||
accounts,
|
accounts,
|
||||||
);
|
);
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', historicalEvents);
|
await warp10.ingest({ className: 'utapi.event' }, historicalEvents);
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(-400));
|
await checkpointTask._execute(getTs(-400));
|
||||||
await checkpointTask._execute(getTs(0));
|
await checkpointTask._execute(getTs(0));
|
||||||
|
|
||||||
|
@ -109,7 +109,7 @@ describe('Test CreateCheckpoint', function () {
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
|
||||||
);
|
);
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(-100));
|
await checkpointTask._execute(getTs(-100));
|
||||||
|
|
||||||
let results = await warp10.fetch({
|
let results = await warp10.fetch({
|
||||||
|
|
|
@ -67,7 +67,7 @@ describe('Test CreateSnapshot', function () {
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(-1));
|
await checkpointTask._execute(getTs(-1));
|
||||||
await snapshotTask._execute(getTs(0));
|
await snapshotTask._execute(getTs(0));
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ describe('Test CreateSnapshot', function () {
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 500,
|
const { events, totals } = generateCustomEvents(start, stop, 500,
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(-400));
|
await checkpointTask._execute(getTs(-400));
|
||||||
await checkpointTask._execute(getTs(-300));
|
await checkpointTask._execute(getTs(-300));
|
||||||
await checkpointTask._execute(getTs(-200));
|
await checkpointTask._execute(getTs(-200));
|
||||||
|
@ -110,7 +110,7 @@ describe('Test CreateSnapshot', function () {
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 500,
|
const { events, totals } = generateCustomEvents(start, stop, 500,
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(-300));
|
await checkpointTask._execute(getTs(-300));
|
||||||
await snapshotTask._execute(getTs(-250));
|
await snapshotTask._execute(getTs(-250));
|
||||||
|
|
||||||
|
@ -132,11 +132,11 @@ describe('Test CreateSnapshot', function () {
|
||||||
const stop = getTs(-50);
|
const stop = getTs(-50);
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 500, accounts);
|
const { events, totals } = generateCustomEvents(start, stop, 500, accounts);
|
||||||
|
|
||||||
await warp10.ingest('utapi.event', events);
|
await warp10.ingest({ className: 'utapi.event' }, events);
|
||||||
await checkpointTask._execute(getTs(-1));
|
await checkpointTask._execute(getTs(-1));
|
||||||
|
|
||||||
const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts);
|
const { events: newEvents } = generateCustomEvents(getTs(10), getTs(100), 100, accounts);
|
||||||
await warp10.ingest('utapi.event', newEvents);
|
await warp10.ingest({ className: 'utapi.event' }, newEvents);
|
||||||
await checkpointTask._execute(getTs(100));
|
await checkpointTask._execute(getTs(100));
|
||||||
|
|
||||||
await snapshotTask._execute(getTs(0));
|
await snapshotTask._execute(getTs(0));
|
||||||
|
@ -156,7 +156,7 @@ describe('Test CreateSnapshot', function () {
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||||
|
|
||||||
await warp10.ingest('utapi.repair.event', events);
|
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||||
await repairTask._execute(getTs(-1));
|
await repairTask._execute(getTs(-1));
|
||||||
await snapshotTask._execute(getTs(0));
|
await snapshotTask._execute(getTs(0));
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const uuid = require('uuid');
|
||||||
|
const { mpuBucketPrefix } = require('arsenal/lib/constants');
|
||||||
|
|
||||||
|
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||||
|
const { ReindexTask } = require('../../../../libV2/tasks');
|
||||||
|
const { BucketD, values } = require('../../../utils/mock/');
|
||||||
|
const { protobuf } = require('../../../utils/v2Data');
|
||||||
|
|
||||||
|
const { CANONICAL_ID, BUCKET_NAME } = values;
|
||||||
|
const bucketCounts = [1, 251];
|
||||||
|
|
||||||
|
const bucketRecord = {
|
||||||
|
ops: {},
|
||||||
|
sizeD: 1024,
|
||||||
|
objD: 1,
|
||||||
|
inB: 0,
|
||||||
|
outB: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
const accountRecord = {
|
||||||
|
ops: {},
|
||||||
|
sizeD: 2048,
|
||||||
|
objD: 2,
|
||||||
|
inB: 0,
|
||||||
|
outB: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
// eslint-disable-next-line func-names
|
||||||
|
describe('Test ReindexTask', function () {
|
||||||
|
this.timeout(120000);
|
||||||
|
|
||||||
|
let prefix;
|
||||||
|
let warp10;
|
||||||
|
let reindexTask;
|
||||||
|
const bucketd = new BucketD(true);
|
||||||
|
|
||||||
|
before(() => bucketd.start());
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
prefix = uuid.v4();
|
||||||
|
reindexTask = new ReindexTask({ warp10: { nodeId: prefix } });
|
||||||
|
reindexTask._program = { nodeId: prefix };
|
||||||
|
warp10 = new Warp10Client({ nodeId: prefix });
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
bucketd.reset();
|
||||||
|
});
|
||||||
|
|
||||||
|
async function assertResult(labels, value) {
|
||||||
|
const results = await warp10.fetch({
|
||||||
|
className: 'utapi.repair.reindex',
|
||||||
|
labels,
|
||||||
|
start: 'now',
|
||||||
|
stop: -100,
|
||||||
|
});
|
||||||
|
|
||||||
|
assert.strictEqual(results.result.length, 1);
|
||||||
|
assert.notStrictEqual(results.result[0], '');
|
||||||
|
const series = JSON.parse(results.result[0]);
|
||||||
|
assert.strictEqual(series.length, 1);
|
||||||
|
const record = protobuf.decode('Record', series[0].v[0][1]);
|
||||||
|
assert.deepStrictEqual(record, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
bucketCounts.forEach(count => {
|
||||||
|
it(`should reindex bucket listing with a length of ${count}`, async () => {
|
||||||
|
const bucket = `${BUCKET_NAME}-${count}`;
|
||||||
|
const mpuBucket = `${mpuBucketPrefix}${bucket}`;
|
||||||
|
bucketd
|
||||||
|
.setBucketContent({
|
||||||
|
bucketName: bucket,
|
||||||
|
contentLength: 1024,
|
||||||
|
})
|
||||||
|
.setBucketContent({
|
||||||
|
bucketName: mpuBucket,
|
||||||
|
contentLength: 1024,
|
||||||
|
})
|
||||||
|
.setBucketCount(count)
|
||||||
|
.createBuckets();
|
||||||
|
|
||||||
|
await reindexTask._execute();
|
||||||
|
await assertResult({ bck: bucket, node: prefix }, bucketRecord);
|
||||||
|
await assertResult({ bck: mpuBucket, node: prefix }, bucketRecord);
|
||||||
|
await assertResult({ acc: CANONICAL_ID, node: prefix }, accountRecord);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -60,7 +60,7 @@ describe('Test Repair', function () {
|
||||||
const { events, totals } = generateCustomEvents(start, stop, 100,
|
const { events, totals } = generateCustomEvents(start, stop, 100,
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } });
|
||||||
|
|
||||||
await warp10.ingest('utapi.repair.event', events);
|
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||||
await repairTask._execute(getTs(0));
|
await repairTask._execute(getTs(0));
|
||||||
|
|
||||||
const results = await warp10.fetch({
|
const results = await warp10.fetch({
|
||||||
|
@ -89,8 +89,8 @@ describe('Test Repair', function () {
|
||||||
accounts,
|
accounts,
|
||||||
);
|
);
|
||||||
|
|
||||||
await warp10.ingest('utapi.repair.event', historicalEvents);
|
await warp10.ingest({ className: 'utapi.repair.event' }, historicalEvents);
|
||||||
await warp10.ingest('utapi.repair.event', events);
|
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||||
await repairTask._execute(getTs(-400));
|
await repairTask._execute(getTs(-400));
|
||||||
await repairTask._execute(getTs(0));
|
await repairTask._execute(getTs(0));
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ describe('Test Repair', function () {
|
||||||
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
|
{ [uuid.v4()]: { [uuid.v4()]: [uuid.v4()] } },
|
||||||
);
|
);
|
||||||
|
|
||||||
await warp10.ingest('utapi.repair.event', events);
|
await warp10.ingest({ className: 'utapi.repair.event' }, events);
|
||||||
await repairTask._execute(getTs(-100));
|
await repairTask._execute(getTs(-100));
|
||||||
|
|
||||||
let results = await warp10.fetch({
|
let results = await warp10.fetch({
|
||||||
|
|
|
@ -19,13 +19,13 @@ describe('Test Warp Client', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should ingest records', async () => {
|
it('should ingest records', async () => {
|
||||||
const res = await warp10.ingest(className, testValues);
|
const res = await warp10.ingest({ className }, testValues);
|
||||||
assert.strictEqual(res, testValues.length);
|
assert.strictEqual(res, testValues.length);
|
||||||
});
|
});
|
||||||
|
|
||||||
// TODO after the macro encoder is written this will need to be updated
|
// TODO after the macro encoder is written this will need to be updated
|
||||||
it('should fetch records', async () => {
|
it('should fetch records', async () => {
|
||||||
await warp10.ingest(className, testValues);
|
await warp10.ingest({ className }, testValues);
|
||||||
const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 });
|
const res = await warp10.fetch({ className, start: `${new Date().getTime()}000`, stop: -100 });
|
||||||
const parsed = JSON.parse(res.result[0])[0];
|
const parsed = JSON.parse(res.result[0])[0];
|
||||||
assert.strictEqual(parsed.c, className);
|
assert.strictEqual(parsed.c, className);
|
||||||
|
|
|
@ -7,14 +7,15 @@ const { models, constants } = require('arsenal');
|
||||||
const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
|
const { CANONICAL_ID, BUCKET_NAME, OBJECT_KEY } = require('./values');
|
||||||
|
|
||||||
const { ObjectMD } = models;
|
const { ObjectMD } = models;
|
||||||
const app = express();
|
|
||||||
|
|
||||||
class BucketD {
|
class BucketD {
|
||||||
constructor() {
|
constructor(isV2 = false) {
|
||||||
this._server = null;
|
this._server = null;
|
||||||
this._bucketCount = 0;
|
this._bucketCount = 0;
|
||||||
this._bucketContent = {};
|
this._bucketContent = {};
|
||||||
this._buckets = [];
|
this._buckets = [];
|
||||||
|
this._isV2 = isV2;
|
||||||
|
this._app = express();
|
||||||
}
|
}
|
||||||
|
|
||||||
clearBuckets() {
|
clearBuckets() {
|
||||||
|
@ -61,14 +62,14 @@ class BucketD {
|
||||||
CommonPrefixes: [],
|
CommonPrefixes: [],
|
||||||
};
|
};
|
||||||
const maxKeys = parseInt(req.query.maxKeys, 10);
|
const maxKeys = parseInt(req.query.maxKeys, 10);
|
||||||
if (req.query.marker) {
|
if (req.query.marker || req.query.gt) {
|
||||||
body.IsTruncated = false;
|
body.IsTruncated = false;
|
||||||
body.Contents = this._buckets.slice(maxKeys);
|
body.Contents = this._buckets.slice(maxKeys);
|
||||||
} else {
|
} else {
|
||||||
body.IsTruncated = maxKeys < this._bucketCount;
|
body.IsTruncated = maxKeys < this._bucketCount;
|
||||||
body.Contents = this._buckets.slice(0, maxKeys);
|
body.Contents = this._buckets.slice(0, maxKeys);
|
||||||
}
|
}
|
||||||
return JSON.stringify(body);
|
return body;
|
||||||
}
|
}
|
||||||
|
|
||||||
_getShadowBucketResponse(bucketName) {
|
_getShadowBucketResponse(bucketName) {
|
||||||
|
@ -77,7 +78,7 @@ class BucketD {
|
||||||
IsTruncated: false,
|
IsTruncated: false,
|
||||||
Contents: this._bucketContent[bucketName] || [],
|
Contents: this._bucketContent[bucketName] || [],
|
||||||
};
|
};
|
||||||
return JSON.stringify(body);
|
return body;
|
||||||
}
|
}
|
||||||
|
|
||||||
_getBucketResponse(bucketName) {
|
_getBucketResponse(bucketName) {
|
||||||
|
@ -86,7 +87,7 @@ class BucketD {
|
||||||
IsTruncated: false,
|
IsTruncated: false,
|
||||||
Contents: this._bucketContent[bucketName] || [],
|
Contents: this._bucketContent[bucketName] || [],
|
||||||
};
|
};
|
||||||
return JSON.stringify(body);
|
return body;
|
||||||
}
|
}
|
||||||
|
|
||||||
_getShadowBucketOverviewResponse(bucketName) {
|
_getShadowBucketOverviewResponse(bucketName) {
|
||||||
|
@ -99,11 +100,11 @@ class BucketD {
|
||||||
IsTruncated: false,
|
IsTruncated: false,
|
||||||
Uploads: mpus,
|
Uploads: mpus,
|
||||||
};
|
};
|
||||||
return JSON.stringify(body);
|
return body;
|
||||||
}
|
}
|
||||||
|
|
||||||
_initiateRoutes() {
|
_initiateRoutes() {
|
||||||
app.param('bucketName', (req, res, next, bucketName) => {
|
this._app.param('bucketName', (req, res, next, bucketName) => {
|
||||||
/* eslint-disable no-param-reassign */
|
/* eslint-disable no-param-reassign */
|
||||||
if (bucketName === constants.usersBucket) {
|
if (bucketName === constants.usersBucket) {
|
||||||
req.body = this._getUsersBucketResponse(req);
|
req.body = this._getUsersBucketResponse(req);
|
||||||
|
@ -115,26 +116,55 @@ class BucketD {
|
||||||
) {
|
) {
|
||||||
req.body = this._getBucketResponse(bucketName);
|
req.body = this._getBucketResponse(bucketName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// v2 reindex uses `Basic` listing type for everything
|
||||||
|
if (this._isV2) {
|
||||||
|
if (req.body && req.body.Contents) {
|
||||||
|
req.body = req.body.Contents;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* eslint-enable no-param-reassign */
|
/* eslint-enable no-param-reassign */
|
||||||
next();
|
next();
|
||||||
});
|
});
|
||||||
|
|
||||||
app.get('/default/bucket/:bucketName', (req, res) => {
|
this._app.get('/default/attributes/:bucketName', (req, res) => {
|
||||||
res.writeHead(200);
|
const key = req.params.bucketName;
|
||||||
res.write(req.body);
|
const bucket = this._bucketContent[key];
|
||||||
res.end();
|
if (bucket) {
|
||||||
|
res.status(200).send({
|
||||||
|
name: key,
|
||||||
|
owner: CANONICAL_ID,
|
||||||
|
ownerDisplayName: 'steve',
|
||||||
|
creationDate: new Date(),
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
res.statusMessage = 'DBNotFound';
|
||||||
|
res.status(404).end();
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
this._app.get('/default/bucket/:bucketName', (req, res) => {
|
||||||
|
res.status(200).send(req.body);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
start() {
|
start() {
|
||||||
this._initiateRoutes();
|
this._initiateRoutes();
|
||||||
const port = 9000;
|
const port = 9000;
|
||||||
this._server = http.createServer(app).listen(port);
|
this._server = http.createServer(this._app).listen(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
end() {
|
end() {
|
||||||
this._server.close();
|
this._server.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reset() {
|
||||||
|
this._bucketCount = 0;
|
||||||
|
this._bucketContent = {};
|
||||||
|
this._buckets = [];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = BucketD;
|
module.exports = BucketD;
|
||||||
|
|
|
@ -36,13 +36,14 @@
|
||||||
$operation_info 'labels' GET 'labels' STORE
|
$operation_info 'labels' GET 'labels' STORE
|
||||||
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
|
$operation_info 'start' GET TOLONG 'startTimestamp' STORE
|
||||||
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
$operation_info 'end' GET TOLONG 'endTimestamp' STORE
|
||||||
|
$operation_info 'node' GET 'nodeID' STORE
|
||||||
|
|
||||||
// 'Fetching metrics for ' $labels ->JSON + LOGMSG
|
// 'Fetching metrics for ' $labels ->JSON + LOGMSG
|
||||||
// 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG
|
// 'Time Range: ' $startTimestamp TOSTRING + ' ' + $endTimestamp TOSTRING + LOGMSG
|
||||||
|
|
||||||
$read_token $labels $startTimestamp @utapi/getMetricsAt 'startResults' STORE
|
$read_token $labels $startTimestamp $nodeID @utapi/getMetricsAt 'startResults' STORE
|
||||||
|
|
||||||
$read_token $labels $endTimestamp @utapi/getMetricsAt 'endResults' STORE
|
$read_token $labels $endTimestamp $nodeID @utapi/getMetricsAt 'endResults' STORE
|
||||||
|
|
||||||
// $startResults ->JSON LOGMSG
|
// $startResults ->JSON LOGMSG
|
||||||
// $endResults ->JSON LOGMSG
|
// $endResults ->JSON LOGMSG
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
<%
|
<%
|
||||||
'getMetricsAt' SECTION
|
'getMetricsAt' SECTION
|
||||||
|
|
||||||
|
'nodeID' STORE
|
||||||
'endTimestamp' STORE
|
'endTimestamp' STORE
|
||||||
'labels' STORE
|
'labels' STORE
|
||||||
'read_token' STORE
|
'read_token' STORE
|
||||||
|
@ -37,6 +38,7 @@
|
||||||
'utapi.checkpoint' 'checkpoint_class' STORE
|
'utapi.checkpoint' 'checkpoint_class' STORE
|
||||||
'utapi.snapshot' 'snapshot_class' STORE
|
'utapi.snapshot' 'snapshot_class' STORE
|
||||||
'utapi.repair.correction' 'correction_class' STORE
|
'utapi.repair.correction' 'correction_class' STORE
|
||||||
|
'utapi.repair.reindex' 'reindex_class' STORE
|
||||||
|
|
||||||
{} 'snapshots' STORE
|
{} 'snapshots' STORE
|
||||||
|
|
||||||
|
@ -248,6 +250,25 @@
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
%> FOREACH
|
%> FOREACH
|
||||||
|
|
||||||
|
'load_reindex' SECTION
|
||||||
|
// Only load the latest reindex for the current node
|
||||||
|
$labels UNMAP 'node' $nodeID } 'filterLabels' STORE
|
||||||
|
{
|
||||||
|
'token' $read_token
|
||||||
|
'class' $reindex_class
|
||||||
|
'labels' $filterLabels
|
||||||
|
'end' $endTimestamp
|
||||||
|
'count' 1
|
||||||
|
} FETCH
|
||||||
|
<% // Handle multiple GTS
|
||||||
|
VALUES
|
||||||
|
<% // For each reindex correction
|
||||||
|
@utapi/decodeRecord
|
||||||
|
// DUP 'Loaded reindex correction ' SWAP ->JSON + LOGMSG
|
||||||
|
$results @util/sumRecord 'results' STORE
|
||||||
|
%> FOREACH
|
||||||
|
%> FOREACH
|
||||||
|
|
||||||
$results // Leave results on the stack
|
$results // Leave results on the stack
|
||||||
|
|
||||||
%>
|
%>
|
||||||
|
|
|
@ -0,0 +1,3 @@
|
||||||
|
<%
|
||||||
|
EVAL @utapi/encodeRecord
|
||||||
|
%>
|
Loading…
Reference in New Issue