Compare commits
15 Commits
developmen
...
refactor/S
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | c0dabdbf16 | |
Taylor McKinnon | a553ff2df8 | |
Taylor McKinnon | 607fc1aec3 | |
Taylor McKinnon | c5096ac72a | |
Taylor McKinnon | 5130705c20 | |
Taylor McKinnon | 157c8a6ca7 | |
Taylor McKinnon | 32eb8ab640 | |
Taylor McKinnon | 52d53f544d | |
Taylor McKinnon | 9c9a1f85f5 | |
Taylor McKinnon | fbc1057840 | |
Taylor McKinnon | ae35febc31 | |
Taylor McKinnon | fcf82fb889 | |
Taylor McKinnon | 347b21fdc4 | |
Taylor McKinnon | 4ba0b26ef6 | |
Taylor McKinnon | e6a938b367 |
|
@ -1,13 +1,18 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'CreateCheckpoint',
|
||||
});
|
||||
|
||||
const taskConfig = config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
});
|
||||
|
||||
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
|
||||
const task = new tasks.CreateCheckpoint(taskConfig);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting checkpoint creation'))
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'CreateSnapshot',
|
||||
});
|
||||
|
||||
const task = new tasks.CreateSnapshot({ warp10: [warp10Clients[0]] });
|
||||
const taskConfig = config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
});
|
||||
|
||||
const task = new tasks.CreateSnapshot(taskConfig);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting snapshot creation'))
|
||||
|
|
|
@ -1,13 +1,19 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'MonitorDiskUsage',
|
||||
});
|
||||
|
||||
const taskConfig = config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
});
|
||||
|
||||
const task = new tasks.MonitorDiskUsage({ warp10: [warp10Clients[0]] });
|
||||
|
||||
const task = new tasks.MonitorDiskUsage(taskConfig);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting disk usage monitor'))
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'IngestShard',
|
||||
});
|
||||
|
||||
|
||||
const task = new tasks.IngestShard({ warp10: warp10Clients });
|
||||
const task = new tasks.IngestShard(config);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting shard ingestion'))
|
||||
|
|
|
@ -1,13 +1,12 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'ManualAdjust',
|
||||
});
|
||||
|
||||
|
||||
const task = new tasks.ManualAdjust({ warp10: warp10Clients });
|
||||
const task = new tasks.ManualAdjust(config);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting manual adjustment'))
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'Migrate',
|
||||
});
|
||||
|
||||
const task = new tasks.MigrateTask({ warp10: [warp10Clients[0]] });
|
||||
const taskConfig = config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
});
|
||||
|
||||
const task = new tasks.MigrateTask(taskConfig);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting utapi v1 => v2 migration'))
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'Reindex',
|
||||
});
|
||||
|
||||
const taskConfig = config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
});
|
||||
|
||||
const task = new tasks.ReindexTask({ warp10: [warp10Clients[0]] });
|
||||
const task = new tasks.ReindexTask(taskConfig);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting Reindex daemon'))
|
||||
|
|
|
@ -1,13 +1,18 @@
|
|||
const { tasks } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const { clients: warp10Clients } = require('../libV2/warp10');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
task: 'Repair',
|
||||
});
|
||||
|
||||
const taskConfig = config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
});
|
||||
|
||||
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
|
||||
const task = new tasks.RepairTask(taskConfig);
|
||||
|
||||
task.setup()
|
||||
.then(() => logger.info('Starting Repair daemon'))
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
const { startUtapiServer } = require('..');
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
const config = require('../libV2/config');
|
||||
|
||||
const logger = new LoggerContext({ module: 'entrypoint' });
|
||||
|
||||
startUtapiServer().then(
|
||||
startUtapiServer(config).then(
|
||||
() => logger.info('utapi started'),
|
||||
error => logger.error('Unhandled Error', { error }),
|
||||
);
|
||||
|
|
|
@ -8,8 +8,20 @@ const cacheTypes = {
|
|||
memory: () => new MemoryCache(),
|
||||
};
|
||||
|
||||
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
|
||||
const counterBackend = cacheTypes[config.cache.backend](config.redis);
|
||||
function buildCacheClient(cacheConfig) {
|
||||
const { backend, counter, cache } = cacheConfig;
|
||||
return new CacheClient({
|
||||
cacheBackend: cacheTypes[backend](cache),
|
||||
counterBackend: cacheTypes[backend](counter),
|
||||
});
|
||||
}
|
||||
|
||||
// TODO remove after all users have been moved to buildCacheClient
|
||||
const { cacheBackend, counterBackend } = buildCacheClient({
|
||||
backend: config.cache.backend,
|
||||
cache: config.cache,
|
||||
counter: config.redis,
|
||||
});
|
||||
|
||||
module.exports = {
|
||||
CacheClient,
|
||||
|
@ -17,5 +29,7 @@ module.exports = {
|
|||
MemoryCache,
|
||||
RedisCache,
|
||||
},
|
||||
// TODO remove after all users have been moved to buildCacheClient
|
||||
client: new CacheClient({ cacheBackend, counterBackend }),
|
||||
buildCacheClient,
|
||||
};
|
||||
|
|
|
@ -350,7 +350,8 @@ class Config {
|
|||
* @param {object} newConfig - an object using the same structure as the config file
|
||||
* @returns {Config} - New Config instance
|
||||
*/
|
||||
static merge(newConfig) {
|
||||
// eslint-disable-next-line class-methods-use-this
|
||||
merge(newConfig) {
|
||||
return new Config(newConfig);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,8 +4,9 @@ const { Command } = require('commander');
|
|||
const { logger } = require('./utils');
|
||||
|
||||
class Process extends EventEmitter {
|
||||
constructor(...options) {
|
||||
super(...options);
|
||||
constructor(config) {
|
||||
super();
|
||||
this._config = config;
|
||||
this._program = null;
|
||||
}
|
||||
|
||||
|
@ -20,7 +21,7 @@ class Process extends EventEmitter {
|
|||
cleanUpFunc();
|
||||
});
|
||||
this._program = new Command();
|
||||
await this._setup();
|
||||
await this._setup(this._config);
|
||||
}
|
||||
|
||||
async start() {
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
const ApiController = require('../controller');
|
||||
|
||||
const metricController = new ApiController('metrics');
|
||||
const internalController = new ApiController('internal');
|
||||
|
||||
module.exports = {
|
||||
metricController,
|
||||
internalController,
|
||||
};
|
|
@ -1,5 +1,3 @@
|
|||
const ApiController = require('../controller');
|
||||
const { internalController } = require('./controllers');
|
||||
|
||||
const controller = new ApiController('internal');
|
||||
|
||||
module.exports = controller.buildMap();
|
||||
module.exports = internalController.buildMap();
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
const ApiController = require('../controller');
|
||||
const { metricController } = require('./controllers');
|
||||
|
||||
const controller = new ApiController('metrics');
|
||||
|
||||
module.exports = controller.buildMap();
|
||||
module.exports = metricController.buildMap();
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
const errors = require('../../../errors');
|
||||
const { serviceToWarp10Label } = require('../../../constants');
|
||||
const { clients: warp10Clients } = require('../../../warp10');
|
||||
const { client: cache } = require('../../../cache');
|
||||
const { now, iterIfError } = require('../../../utils');
|
||||
|
||||
/**
|
||||
|
@ -20,7 +18,7 @@ async function getStorage(ctx, params) {
|
|||
.customizeDescription(`Unsupported level "${level}". Only "accounts" is currently supported`);
|
||||
}
|
||||
|
||||
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
|
||||
const [counter, base] = await this.cacheClient.fetchAccountSizeCounter(resource);
|
||||
|
||||
let storageUtilized;
|
||||
|
||||
|
@ -30,7 +28,7 @@ async function getStorage(ctx, params) {
|
|||
const labelName = serviceToWarp10Label[params.level];
|
||||
const labels = { [labelName]: resource };
|
||||
|
||||
const res = await iterIfError(warp10Clients, warp10 => {
|
||||
const res = await iterIfError(this.warp10Clients, warp10 => {
|
||||
const options = {
|
||||
params: {
|
||||
end: now(),
|
||||
|
@ -48,7 +46,7 @@ async function getStorage(ctx, params) {
|
|||
}
|
||||
|
||||
const { sizeD: currentSize } = res.result[0];
|
||||
await cache.updateAccountCounterBase(resource, currentSize);
|
||||
await this.cacheClient.updateAccountCounterBase(resource, currentSize);
|
||||
storageUtilized = currentSize;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
const errors = require('../../../errors');
|
||||
const { UtapiMetric } = require('../../../models');
|
||||
const { client: cacheClient } = require('../../../cache');
|
||||
const { convertTimestamp } = require('../../../utils');
|
||||
const { ingestionOpTranslationMap } = require('../../../constants');
|
||||
|
||||
|
@ -16,7 +15,7 @@ async function ingestMetric(ctx, params) {
|
|||
throw errors.InvalidRequest;
|
||||
}
|
||||
try {
|
||||
await Promise.all(metrics.map(m => cacheClient.pushMetric(m)));
|
||||
await Promise.all(metrics.map(m => this.cacheClient.pushMetric(m)));
|
||||
} catch (error) {
|
||||
throw errors.ServiceUnavailable;
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
const errors = require('../../../errors');
|
||||
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
|
||||
const { convertTimestamp, iterIfError } = require('../../../utils');
|
||||
const { clients: warp10Clients } = require('../../../warp10');
|
||||
|
||||
const emptyOperationsResponse = Object.values(operationToResponse)
|
||||
.reduce((prev, key) => {
|
||||
|
@ -33,7 +32,7 @@ async function listMetric(ctx, params) {
|
|||
resources.map(async ({ resource, id }) => {
|
||||
const labels = { [labelName]: id };
|
||||
|
||||
const res = await iterIfError(warp10Clients, warp10 => {
|
||||
const res = await iterIfError(this.warp10Clients, warp10 => {
|
||||
const options = {
|
||||
params: {
|
||||
start: convertTimestamp(start).toString(),
|
||||
|
|
|
@ -19,6 +19,7 @@ class APIController {
|
|||
constructor(tag) {
|
||||
this._handlers = APIController._collectHandlers(tag);
|
||||
this._middleware = APIController._collectHandlerMiddleware(tag);
|
||||
this._handlerEnvironment = {};
|
||||
}
|
||||
|
||||
static _safeRequire(path) {
|
||||
|
@ -176,13 +177,24 @@ class APIController {
|
|||
* @returns {Object} - Map of operationIds to handler
|
||||
*/
|
||||
buildMap() {
|
||||
this._built = true;
|
||||
return Object.entries(this._handlers)
|
||||
.reduce((ops, [id, handler]) => {
|
||||
const _handler = handler.bind(this._handlerEnvironment);
|
||||
ops[id] = (request, response, done) =>
|
||||
APIController.callOperation(id, handler, this._middleware[id], request, response, done);
|
||||
APIController.callOperation(id, _handler, this._middleware[id], request, response, done);
|
||||
return ops;
|
||||
}, {});
|
||||
}
|
||||
|
||||
setHandlerEnvironment(env) {
|
||||
if (this._built) {
|
||||
throw new Error(
|
||||
'The handler environment can not be changed after ApiController.buildMap() has been called',
|
||||
);
|
||||
}
|
||||
this._handlerEnvironment = env;
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = APIController;
|
||||
|
|
|
@ -5,23 +5,45 @@ const bodyParser = require('body-parser');
|
|||
const { ciphers, dhparam } = require('arsenal').https;
|
||||
|
||||
const Process = require('../process');
|
||||
const config = require('../config');
|
||||
const { initializeOasTools, middleware } = require('./middleware');
|
||||
const { metricController } = require('./API/controllers');
|
||||
const { spec: apiSpec } = require('./spec');
|
||||
const { client: cacheClient } = require('../cache');
|
||||
const { LoggerContext } = require('../utils');
|
||||
const { buildWarp10Clients } = require('../warp10');
|
||||
const { buildCacheClient } = require('../cache');
|
||||
|
||||
const moduleLogger = new LoggerContext({
|
||||
module: 'server',
|
||||
});
|
||||
|
||||
class UtapiServer extends Process {
|
||||
constructor() {
|
||||
super();
|
||||
constructor(config) {
|
||||
super(config);
|
||||
this._app = null;
|
||||
this._server = null;
|
||||
}
|
||||
|
||||
_setupHandlerEnvironment() {
|
||||
const warp10Clients = buildWarp10Clients(
|
||||
this._config.warp10.hosts,
|
||||
{
|
||||
readToken: this._config.warp10.readToken,
|
||||
writeToken: this._config.warp10.writeToken,
|
||||
},
|
||||
);
|
||||
|
||||
this._cacheClient = buildCacheClient({
|
||||
backend: this._config.cache.backend,
|
||||
cache: this._config.cache,
|
||||
counter: this._config.redis,
|
||||
});
|
||||
|
||||
metricController.setHandlerEnvironment({
|
||||
warp10Clients,
|
||||
cacheClient: this._cacheClient,
|
||||
});
|
||||
}
|
||||
|
||||
static async _createApp(spec) {
|
||||
const app = express();
|
||||
app.use(bodyParser.json({ strict: false }));
|
||||
|
@ -32,13 +54,13 @@ class UtapiServer extends Process {
|
|||
return app;
|
||||
}
|
||||
|
||||
static _createHttpsAgent() {
|
||||
_createHttpsAgent() {
|
||||
const conf = {
|
||||
ciphers: ciphers.ciphers,
|
||||
dhparam,
|
||||
cert: config.tls.cert,
|
||||
key: config.tls.key,
|
||||
ca: config.tls.ca ? [config.tls.ca] : null,
|
||||
cert: this._config.tls.cert,
|
||||
key: this._config.tls.key,
|
||||
ca: this._config.tls.ca ? [this._config.tls.ca] : null,
|
||||
requestCert: false,
|
||||
rejectUnauthorized: true,
|
||||
};
|
||||
|
@ -47,36 +69,37 @@ class UtapiServer extends Process {
|
|||
return conf;
|
||||
}
|
||||
|
||||
static async _createServer(app) {
|
||||
if (config.tls) {
|
||||
return https.createServer(UtapiServer._createHttpsAgent(), app);
|
||||
async _createServer(app) {
|
||||
if (this._config.tls) {
|
||||
return https.createServer(this._createHttpsAgent(), app);
|
||||
}
|
||||
return http.createServer(app);
|
||||
}
|
||||
|
||||
static async _startServer(server) {
|
||||
async _startServer(server) {
|
||||
moduleLogger
|
||||
.with({
|
||||
method: 'UtapiServer::_startServer',
|
||||
cacheBackend: config.cacheBackend,
|
||||
cacheBackend: this._config.cacheBackend,
|
||||
})
|
||||
.info(`Server listening on ${config.port}`);
|
||||
await server.listen(config.port);
|
||||
.info(`Server listening on ${this._config.port}`);
|
||||
await server.listen(this._config.port);
|
||||
}
|
||||
|
||||
async _setup() {
|
||||
this._setupHandlerEnvironment();
|
||||
this._app = await UtapiServer._createApp(apiSpec);
|
||||
this._server = await UtapiServer._createServer(this._app);
|
||||
this._server = await this._createServer(this._app);
|
||||
}
|
||||
|
||||
async _start() {
|
||||
await cacheClient.connect();
|
||||
await UtapiServer._startServer(this._server);
|
||||
await this._cacheClient.connect();
|
||||
await this._startServer(this._server);
|
||||
}
|
||||
|
||||
async _join() {
|
||||
await this._server.close();
|
||||
await cacheClient.disconnect();
|
||||
await this._cacheClient.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
const assert = require('assert');
|
||||
const cron = require('node-schedule');
|
||||
const cronparser = require('cron-parser');
|
||||
|
||||
const { client: cacheClient } = require('../cache');
|
||||
const { buildCacheClient } = require('../cache');
|
||||
const Process = require('../process');
|
||||
const { LoggerContext, iterIfError } = require('../utils');
|
||||
const { buildWarp10Clients } = require('../warp10');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'BaseTask',
|
||||
|
@ -13,18 +13,24 @@ const logger = new LoggerContext({
|
|||
class Now {}
|
||||
|
||||
class BaseTask extends Process {
|
||||
constructor(options) {
|
||||
super();
|
||||
assert.notStrictEqual(options, undefined);
|
||||
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
|
||||
this._cache = cacheClient;
|
||||
this._warp10Clients = options.warp10;
|
||||
constructor(config) {
|
||||
super(config);
|
||||
this._cache = null;
|
||||
this._warp10Clients = null;
|
||||
this._scheduler = null;
|
||||
this._defaultSchedule = Now;
|
||||
this._defaultLag = 0;
|
||||
}
|
||||
|
||||
async _setup(includeDefaultOpts = true) {
|
||||
async _setup(config, includeDefaultOpts = true) {
|
||||
this._nodeId = config.nodeId;
|
||||
this._cache = buildCacheClient({
|
||||
backend: config.cache.backend,
|
||||
cache: config.cache,
|
||||
counter: config.redis,
|
||||
});
|
||||
this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
|
||||
|
||||
if (includeDefaultOpts) {
|
||||
this._program
|
||||
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
|
||||
|
@ -94,7 +100,9 @@ class BaseTask extends Process {
|
|||
}
|
||||
|
||||
async _join() {
|
||||
return this._cache.disconnect();
|
||||
if (this._cache) {
|
||||
await this._cache.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
withWarp10(func, onError) {
|
||||
|
|
|
@ -28,8 +28,8 @@ class MonitorDiskUsage extends BaseTask {
|
|||
this._hardLimit = config.diskUsage.hardLimit || null;
|
||||
}
|
||||
|
||||
async _setup() {
|
||||
await super._setup();
|
||||
async _setup(config) {
|
||||
await super._setup(config);
|
||||
this._program
|
||||
.option('--leader', 'Mark this process as the leader for metric expiration.')
|
||||
.option(
|
||||
|
|
|
@ -119,15 +119,26 @@ class Warp10Client {
|
|||
}
|
||||
}
|
||||
|
||||
const clients = _config.warp10.hosts.map(
|
||||
val => new Warp10Client({
|
||||
function buildWarp10Clients(hosts, tokens) {
|
||||
return hosts.map(
|
||||
val => new Warp10Client({
|
||||
...tokens,
|
||||
...val,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// TODO Remove after all users have been moved to building their own clients
|
||||
const clients = buildWarp10Clients(
|
||||
_config.warp10.hosts,
|
||||
{
|
||||
readToken: _config.warp10.readToken,
|
||||
writeToken: _config.warp10.writeToken,
|
||||
...val,
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
module.exports = {
|
||||
Warp10Client,
|
||||
clients,
|
||||
buildWarp10Clients,
|
||||
};
|
||||
|
|
|
@ -2,8 +2,8 @@ const assert = require('assert');
|
|||
const sinon = require('sinon');
|
||||
const uuid = require('uuid');
|
||||
|
||||
const { clients: warp10Clients } = require('../../../libV2/warp10');
|
||||
const { MonitorDiskUsage } = require('../../../libV2/tasks');
|
||||
const config = require('../../../libV2/config');
|
||||
|
||||
const { fillDir } = require('../../utils/v2Data');
|
||||
|
||||
|
@ -15,7 +15,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
|
|||
|
||||
beforeEach(async () => {
|
||||
path = `/tmp/diskusage-${uuid.v4()}`;
|
||||
task = new MonitorDiskUsage({ warp10: warp10Clients });
|
||||
task = new MonitorDiskUsage(config);
|
||||
await task.setup();
|
||||
task._path = path;
|
||||
task._enabled = true;
|
||||
|
|
|
@ -6,6 +6,7 @@ const { MonitorDiskUsage } = require('../../../libV2/tasks');
|
|||
const { UtapiMetric } = require('../../../libV2/models');
|
||||
const { now } = require('../../../libV2/utils');
|
||||
const { expirationChunkDuration } = require('../../../libV2/constants');
|
||||
const config = require('../../../libV2/config');
|
||||
|
||||
const { fetchRecords } = require('../../utils/v2Data');
|
||||
|
||||
|
@ -15,7 +16,7 @@ describe('Test MonitorDiskUsage soft limit', function () {
|
|||
let task;
|
||||
|
||||
beforeEach(async () => {
|
||||
task = new MonitorDiskUsage({ warp10: warp10Clients });
|
||||
task = new MonitorDiskUsage(config);
|
||||
await task.setup();
|
||||
task._expirationEnabled = true;
|
||||
task._program.leader = true;
|
||||
|
|
|
@ -3,7 +3,7 @@ const async = require('async');
|
|||
const uuid = require('uuid');
|
||||
|
||||
const UtapiClient = require('../../../../libV2/client');
|
||||
const { clients: warp10Clients } = require('../../../../libV2/warp10');
|
||||
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||
const config = require('../../../../libV2/config');
|
||||
const { CacheClient, backends: cacheBackends } = require('../../../../libV2/cache');
|
||||
const { IngestShard } = require('../../../../libV2/tasks');
|
||||
|
@ -20,7 +20,7 @@ const getClient = () => new CacheClient({
|
|||
),
|
||||
});
|
||||
|
||||
const warp10 = warp10Clients[0];
|
||||
const warp10 = new Warp10Client();
|
||||
|
||||
// eslint-disable-next-line func-names
|
||||
describe('Test getStorage handler', function () {
|
||||
|
@ -49,9 +49,15 @@ describe('Test getStorage handler', function () {
|
|||
cacheClient = getClient();
|
||||
await cacheClient.connect();
|
||||
|
||||
ingestTask = new IngestShard({ warp10: [warp10Clients[0]] });
|
||||
ingestTask = new IngestShard(
|
||||
config.merge({
|
||||
warp10: {
|
||||
hosts: [config.warp10.hosts[0]],
|
||||
},
|
||||
}),
|
||||
);
|
||||
ingestTask._program = { lag: 0 };
|
||||
await ingestTask._cache.connect();
|
||||
await ingestTask.setup();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
|
|
|
@ -4,6 +4,7 @@ const uuid = require('uuid');
|
|||
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||
const { convertTimestamp } = require('../../../../libV2/utils');
|
||||
const { CreateCheckpoint } = require('../../../../libV2/tasks');
|
||||
const config = require('../../../../libV2/config');
|
||||
|
||||
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
|
||||
|
||||
|
@ -49,7 +50,8 @@ describe('Test CreateCheckpoint', function () {
|
|||
prefix = uuid.v4();
|
||||
|
||||
warp10 = new Warp10Client({ nodeId: prefix });
|
||||
checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
|
||||
checkpointTask = new CreateCheckpoint(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await checkpointTask.setup();
|
||||
checkpointTask._program = { lag: 0, nodeId: prefix };
|
||||
});
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ const uuid = require('uuid');
|
|||
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||
const { convertTimestamp } = require('../../../../libV2/utils');
|
||||
const { CreateCheckpoint, CreateSnapshot, RepairTask } = require('../../../../libV2/tasks');
|
||||
const config = require('../../../../libV2/config');
|
||||
|
||||
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
|
||||
|
||||
|
@ -51,13 +52,16 @@ describe('Test CreateSnapshot', function () {
|
|||
prefix = uuid.v4();
|
||||
warp10 = new Warp10Client({ nodeId: prefix });
|
||||
|
||||
checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
|
||||
checkpointTask = new CreateCheckpoint(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await checkpointTask.setup();
|
||||
checkpointTask._program = { lag: 0, nodeId: prefix };
|
||||
|
||||
snapshotTask = new CreateSnapshot({ warp10: [warp10] });
|
||||
snapshotTask = new CreateSnapshot(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await snapshotTask.setup();
|
||||
snapshotTask._program = { lag: 0, nodeId: prefix };
|
||||
|
||||
repairTask = new RepairTask({ warp10: [warp10] });
|
||||
repairTask = new RepairTask(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await repairTask.setup();
|
||||
repairTask._program = { lag: 0, nodeId: prefix };
|
||||
});
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ const uuid = require('uuid');
|
|||
|
||||
const { MonitorDiskUsage } = require('../../../../libV2/tasks');
|
||||
const { getFolderSize } = require('../../../../libV2/utils');
|
||||
const config = require('../../../../libV2/config');
|
||||
|
||||
const { fillDir } = require('../../../utils/v2Data');
|
||||
|
||||
|
@ -43,7 +44,7 @@ describe('Test MonitorDiskUsage', () => {
|
|||
|
||||
beforeEach(async () => {
|
||||
path = `/tmp/diskusage-${uuid.v4()}`;
|
||||
task = new MonitorDiskUsageShim({ warp10: [] });
|
||||
task = new MonitorDiskUsageShim(config);
|
||||
task._path = path;
|
||||
task._enabled = true;
|
||||
await task.setup();
|
||||
|
|
|
@ -58,7 +58,8 @@ describe('Test IngestShards', function () {
|
|||
await cacheClient.connect();
|
||||
|
||||
warp10 = new Warp10Client({ nodeId: prefix });
|
||||
ingestTask = new IngestShard({ warp10: [warp10] });
|
||||
ingestTask = new IngestShard(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await ingestTask.setup();
|
||||
ingestTask._cache._cacheBackend._prefix = prefix;
|
||||
ingestTask._program = { lag: 0 };
|
||||
await ingestTask._cache.connect();
|
||||
|
|
|
@ -4,6 +4,7 @@ const { mpuBucketPrefix } = require('arsenal').constants;
|
|||
|
||||
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||
const { ReindexTask } = require('../../../../libV2/tasks');
|
||||
const config = require('../../../../libV2/config');
|
||||
const { now } = require('../../../../libV2/utils');
|
||||
const { BucketD, values } = require('../../../utils/mock/');
|
||||
const { fetchRecords } = require('../../../utils/v2Data');
|
||||
|
@ -32,10 +33,11 @@ describe('Test ReindexTask', function () {
|
|||
|
||||
before(() => bucketd.start());
|
||||
|
||||
beforeEach(() => {
|
||||
beforeEach(async () => {
|
||||
prefix = uuid.v4();
|
||||
warp10 = new Warp10Client({ nodeId: prefix });
|
||||
reindexTask = new ReindexTask({ warp10: [warp10] });
|
||||
reindexTask = new ReindexTask(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await reindexTask.setup();
|
||||
reindexTask._program = { nodeId: prefix };
|
||||
});
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ const uuid = require('uuid');
|
|||
const { Warp10Client } = require('../../../../libV2/warp10');
|
||||
const { convertTimestamp } = require('../../../../libV2/utils');
|
||||
const { RepairTask } = require('../../../../libV2/tasks');
|
||||
const config = require('../../../../libV2/config');
|
||||
|
||||
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
|
||||
|
||||
|
@ -49,7 +50,8 @@ describe('Test Repair', function () {
|
|||
beforeEach(async () => {
|
||||
prefix = uuid.v4();
|
||||
warp10 = new Warp10Client({ nodeId: prefix });
|
||||
repairTask = new RepairTask({ warp10: [warp10] });
|
||||
repairTask = new RepairTask(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
|
||||
await repairTask.setup();
|
||||
repairTask._program = { lag: 0, nodeId: prefix };
|
||||
});
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ const UtapiClient = require('../../../libV2/client');
|
|||
const { clients: warp10Clients } = require('../../../libV2/warp10');
|
||||
const config = require('../../../libV2/config');
|
||||
const { CacheClient, backends: cacheBackends } = require('../../../libV2/cache');
|
||||
const { IngestShard } = require('../../../libV2/tasks');
|
||||
const { now } = require('../../../libV2/utils');
|
||||
const { generateCustomEvents } = require('../../utils/v2Data');
|
||||
|
||||
|
@ -104,7 +103,6 @@ describe('Test UtapiClient', function () {
|
|||
let totals;
|
||||
|
||||
let cacheClient;
|
||||
let ingestTask;
|
||||
|
||||
beforeEach(async () => {
|
||||
client = new UtapiClient({
|
||||
|
@ -121,10 +119,6 @@ describe('Test UtapiClient', function () {
|
|||
|
||||
cacheClient = getClient();
|
||||
await cacheClient.connect();
|
||||
|
||||
ingestTask = new IngestShard({ warp10: [warp10] });
|
||||
ingestTask._program = { lag: 0 };
|
||||
await ingestTask._cache.connect();
|
||||
});
|
||||
|
||||
it('should get the current storage for an account with a empty cache', async () => {
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
const assert = require('assert');
|
||||
const sinon = require('sinon');
|
||||
const ingestMetric = require('../../../../../libV2/server/API/metrics/ingestMetric');
|
||||
const { client: cacheClient } = require('../../../../../libV2/cache');
|
||||
const { buildCacheClient } = require('../../../../../libV2/cache');
|
||||
const { convertTimestamp } = require('../../../../../libV2/utils');
|
||||
const { UtapiMetric } = require('../../../../../libV2/models');
|
||||
const { generateFakeEvents, templateContext } = require('../../../../utils/v2Data');
|
||||
|
||||
const events = generateFakeEvents(1, 50, 50);
|
||||
const cacheClient = buildCacheClient({ backend: 'memory' });
|
||||
|
||||
const _ingestMetric = ingestMetric.bind({ cacheClient });
|
||||
|
||||
describe('Test ingestMetric', () => {
|
||||
let ctx;
|
||||
|
@ -18,7 +21,7 @@ describe('Test ingestMetric', () => {
|
|||
|
||||
it('should ingest metrics', async () => {
|
||||
const spy = sinon.spy(cacheClient, 'pushMetric');
|
||||
await ingestMetric(ctx, { body: events.map(ev => ev.getValue()) });
|
||||
await _ingestMetric(ctx, { body: events.map(ev => ev.getValue()) });
|
||||
assert.strictEqual(ctx.results.statusCode, 200);
|
||||
events.forEach(ev => {
|
||||
assert(spy.calledWith(
|
||||
|
@ -32,14 +35,14 @@ describe('Test ingestMetric', () => {
|
|||
|
||||
it('should throw InvalidRequest if metric data is invalid',
|
||||
() => assert.rejects(
|
||||
ingestMetric(ctx, { body: [{ operationId: 'invalid' }] }),
|
||||
_ingestMetric(ctx, { body: [{ operationId: 'invalid' }] }),
|
||||
err => err.code === 400 && err.InvalidRequest,
|
||||
));
|
||||
|
||||
it('should throw ServiceUnavailable if the cache client encounters an error', () => {
|
||||
sinon.stub(cacheClient, 'pushMetric').rejects();
|
||||
return assert.rejects(
|
||||
ingestMetric(ctx, { body: events.map(ev => ev.getValue()) }),
|
||||
_ingestMetric(ctx, { body: events.map(ev => ev.getValue()) }),
|
||||
err => err.code === 503 && err.ServiceUnavailable,
|
||||
);
|
||||
});
|
||||
|
@ -49,7 +52,7 @@ describe('Test ingestMetric', () => {
|
|||
const metric = new UtapiMetric({
|
||||
operationId: 'putDeleteMarkerObject',
|
||||
});
|
||||
await ingestMetric(ctx, { body: [metric.getValue()] });
|
||||
await _ingestMetric(ctx, { body: [metric.getValue()] });
|
||||
assert.strictEqual(ctx.results.statusCode, 200);
|
||||
assert(spy.calledWith(
|
||||
new UtapiMetric({
|
||||
|
|
Loading…
Reference in New Issue