Compare commits

...

15 Commits

Author SHA1 Message Date
Taylor McKinnon c0dabdbf16 lint 2021-03-29 13:14:15 -07:00
Taylor McKinnon a553ff2df8 convert limit tests 2021-03-29 13:11:28 -07:00
Taylor McKinnon 607fc1aec3 fix storage handler 2021-03-29 13:08:17 -07:00
Taylor McKinnon c5096ac72a fix task config 2021-03-29 13:07:57 -07:00
Taylor McKinnon 5130705c20 convert functional tests 2021-03-29 13:07:30 -07:00
Taylor McKinnon 157c8a6ca7 convert unit tests 2021-03-29 12:36:48 -07:00
Taylor McKinnon 32eb8ab640 remove unused dep 2021-03-29 12:27:03 -07:00
Taylor McKinnon 52d53f544d protect task _join 2021-03-29 12:23:29 -07:00
Taylor McKinnon 9c9a1f85f5 build cacheClient 2021-03-29 12:21:33 -07:00
Taylor McKinnon fbc1057840 convert server apis 2021-03-29 12:21:19 -07:00
Taylor McKinnon ae35febc31 add ability to set `this` for handler functions allowing api clients to be built elsewhere 2021-03-29 12:07:17 -07:00
Taylor McKinnon fcf82fb889 centralise api controller creation to allow importing 2021-03-29 11:55:28 -07:00
Taylor McKinnon 347b21fdc4 convert tasks creation to pass in config 2021-03-29 11:54:19 -07:00
Taylor McKinnon 4ba0b26ef6 pass config into Process class instantiation 2021-03-29 11:53:43 -07:00
Taylor McKinnon e6a938b367 add client constructors 2021-03-29 10:58:20 -07:00
34 changed files with 224 additions and 105 deletions

View File

@ -1,13 +1,18 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'CreateCheckpoint',
});
const taskConfig = config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
});
const task = new tasks.CreateCheckpoint({ warp10: [warp10Clients[0]] });
const task = new tasks.CreateCheckpoint(taskConfig);
task.setup()
.then(() => logger.info('Starting checkpoint creation'))

View File

@ -1,12 +1,18 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'CreateSnapshot',
});
const task = new tasks.CreateSnapshot({ warp10: [warp10Clients[0]] });
const taskConfig = config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
});
const task = new tasks.CreateSnapshot(taskConfig);
task.setup()
.then(() => logger.info('Starting snapshot creation'))

View File

@ -1,13 +1,19 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'MonitorDiskUsage',
});
const taskConfig = config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
});
const task = new tasks.MonitorDiskUsage({ warp10: [warp10Clients[0]] });
const task = new tasks.MonitorDiskUsage(taskConfig);
task.setup()
.then(() => logger.info('Starting disk usage monitor'))

View File

@ -1,13 +1,12 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'IngestShard',
});
const task = new tasks.IngestShard({ warp10: warp10Clients });
const task = new tasks.IngestShard(config);
task.setup()
.then(() => logger.info('Starting shard ingestion'))

View File

@ -1,13 +1,12 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'ManualAdjust',
});
const task = new tasks.ManualAdjust({ warp10: warp10Clients });
const task = new tasks.ManualAdjust(config);
task.setup()
.then(() => logger.info('Starting manual adjustment'))

View File

@ -1,12 +1,18 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'Migrate',
});
const task = new tasks.MigrateTask({ warp10: [warp10Clients[0]] });
const taskConfig = config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
});
const task = new tasks.MigrateTask(taskConfig);
task.setup()
.then(() => logger.info('Starting utapi v1 => v2 migration'))

View File

@ -1,13 +1,18 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'Reindex',
});
const taskConfig = config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
});
const task = new tasks.ReindexTask({ warp10: [warp10Clients[0]] });
const task = new tasks.ReindexTask(taskConfig);
task.setup()
.then(() => logger.info('Starting Reindex daemon'))

View File

@ -1,13 +1,18 @@
const { tasks } = require('..');
const { LoggerContext } = require('../libV2/utils');
const { clients: warp10Clients } = require('../libV2/warp10');
const config = require('../libV2/config');
const logger = new LoggerContext({
task: 'Repair',
});
const taskConfig = config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
});
const task = new tasks.RepairTask({ warp10: [warp10Clients[0]] });
const task = new tasks.RepairTask(taskConfig);
task.setup()
.then(() => logger.info('Starting Repair daemon'))

View File

@ -1,9 +1,10 @@
const { startUtapiServer } = require('..');
const { LoggerContext } = require('../libV2/utils');
const config = require('../libV2/config');
const logger = new LoggerContext({ module: 'entrypoint' });
startUtapiServer().then(
startUtapiServer(config).then(
() => logger.info('utapi started'),
error => logger.error('Unhandled Error', { error }),
);

18
libV2/cache/index.js vendored
View File

@ -8,8 +8,20 @@ const cacheTypes = {
memory: () => new MemoryCache(),
};
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
const counterBackend = cacheTypes[config.cache.backend](config.redis);
function buildCacheClient(cacheConfig) {
const { backend, counter, cache } = cacheConfig;
return new CacheClient({
cacheBackend: cacheTypes[backend](cache),
counterBackend: cacheTypes[backend](counter),
});
}
// TODO remove after all users have been moved to buildCacheClient
const { cacheBackend, counterBackend } = buildCacheClient({
backend: config.cache.backend,
cache: config.cache,
counter: config.redis,
});
module.exports = {
CacheClient,
@ -17,5 +29,7 @@ module.exports = {
MemoryCache,
RedisCache,
},
// TODO remove after all users have been moved to buildCacheClient
client: new CacheClient({ cacheBackend, counterBackend }),
buildCacheClient,
};

View File

@ -350,7 +350,8 @@ class Config {
* @param {object} newConfig - an object using the same structure as the config file
* @returns {Config} - New Config instance
*/
static merge(newConfig) {
// eslint-disable-next-line class-methods-use-this
merge(newConfig) {
return new Config(newConfig);
}
}

View File

@ -4,8 +4,9 @@ const { Command } = require('commander');
const { logger } = require('./utils');
class Process extends EventEmitter {
constructor(...options) {
super(...options);
constructor(config) {
super();
this._config = config;
this._program = null;
}
@ -20,7 +21,7 @@ class Process extends EventEmitter {
cleanUpFunc();
});
this._program = new Command();
await this._setup();
await this._setup(this._config);
}
async start() {

View File

@ -0,0 +1,9 @@
const ApiController = require('../controller');
const metricController = new ApiController('metrics');
const internalController = new ApiController('internal');
module.exports = {
metricController,
internalController,
};

View File

@ -1,5 +1,3 @@
const ApiController = require('../controller');
const { internalController } = require('./controllers');
const controller = new ApiController('internal');
module.exports = controller.buildMap();
module.exports = internalController.buildMap();

View File

@ -1,5 +1,3 @@
const ApiController = require('../controller');
const { metricController } = require('./controllers');
const controller = new ApiController('metrics');
module.exports = controller.buildMap();
module.exports = metricController.buildMap();

View File

@ -1,7 +1,5 @@
const errors = require('../../../errors');
const { serviceToWarp10Label } = require('../../../constants');
const { clients: warp10Clients } = require('../../../warp10');
const { client: cache } = require('../../../cache');
const { now, iterIfError } = require('../../../utils');
/**
@ -20,7 +18,7 @@ async function getStorage(ctx, params) {
.customizeDescription(`Unsupported level "${level}". Only "accounts" is currently supported`);
}
const [counter, base] = await cache.fetchAccountSizeCounter(resource);
const [counter, base] = await this.cacheClient.fetchAccountSizeCounter(resource);
let storageUtilized;
@ -30,7 +28,7 @@ async function getStorage(ctx, params) {
const labelName = serviceToWarp10Label[params.level];
const labels = { [labelName]: resource };
const res = await iterIfError(warp10Clients, warp10 => {
const res = await iterIfError(this.warp10Clients, warp10 => {
const options = {
params: {
end: now(),
@ -48,7 +46,7 @@ async function getStorage(ctx, params) {
}
const { sizeD: currentSize } = res.result[0];
await cache.updateAccountCounterBase(resource, currentSize);
await this.cacheClient.updateAccountCounterBase(resource, currentSize);
storageUtilized = currentSize;
}

View File

@ -1,6 +1,5 @@
const errors = require('../../../errors');
const { UtapiMetric } = require('../../../models');
const { client: cacheClient } = require('../../../cache');
const { convertTimestamp } = require('../../../utils');
const { ingestionOpTranslationMap } = require('../../../constants');
@ -16,7 +15,7 @@ async function ingestMetric(ctx, params) {
throw errors.InvalidRequest;
}
try {
await Promise.all(metrics.map(m => cacheClient.pushMetric(m)));
await Promise.all(metrics.map(m => this.cacheClient.pushMetric(m)));
} catch (error) {
throw errors.ServiceUnavailable;
}

View File

@ -1,7 +1,6 @@
const errors = require('../../../errors');
const { serviceToWarp10Label, operationToResponse } = require('../../../constants');
const { convertTimestamp, iterIfError } = require('../../../utils');
const { clients: warp10Clients } = require('../../../warp10');
const emptyOperationsResponse = Object.values(operationToResponse)
.reduce((prev, key) => {
@ -33,7 +32,7 @@ async function listMetric(ctx, params) {
resources.map(async ({ resource, id }) => {
const labels = { [labelName]: id };
const res = await iterIfError(warp10Clients, warp10 => {
const res = await iterIfError(this.warp10Clients, warp10 => {
const options = {
params: {
start: convertTimestamp(start).toString(),

View File

@ -19,6 +19,7 @@ class APIController {
constructor(tag) {
this._handlers = APIController._collectHandlers(tag);
this._middleware = APIController._collectHandlerMiddleware(tag);
this._handlerEnvironment = {};
}
static _safeRequire(path) {
@ -176,13 +177,24 @@ class APIController {
* @returns {Object} - Map of operationIds to handler
*/
buildMap() {
this._built = true;
return Object.entries(this._handlers)
.reduce((ops, [id, handler]) => {
const _handler = handler.bind(this._handlerEnvironment);
ops[id] = (request, response, done) =>
APIController.callOperation(id, handler, this._middleware[id], request, response, done);
APIController.callOperation(id, _handler, this._middleware[id], request, response, done);
return ops;
}, {});
}
setHandlerEnvironment(env) {
if (this._built) {
throw new Error(
'The handler environment can not be changed after ApiController.buildMap() has been called',
);
}
this._handlerEnvironment = env;
}
}
module.exports = APIController;

View File

@ -5,23 +5,45 @@ const bodyParser = require('body-parser');
const { ciphers, dhparam } = require('arsenal').https;
const Process = require('../process');
const config = require('../config');
const { initializeOasTools, middleware } = require('./middleware');
const { metricController } = require('./API/controllers');
const { spec: apiSpec } = require('./spec');
const { client: cacheClient } = require('../cache');
const { LoggerContext } = require('../utils');
const { buildWarp10Clients } = require('../warp10');
const { buildCacheClient } = require('../cache');
const moduleLogger = new LoggerContext({
module: 'server',
});
class UtapiServer extends Process {
constructor() {
super();
constructor(config) {
super(config);
this._app = null;
this._server = null;
}
_setupHandlerEnvironment() {
const warp10Clients = buildWarp10Clients(
this._config.warp10.hosts,
{
readToken: this._config.warp10.readToken,
writeToken: this._config.warp10.writeToken,
},
);
this._cacheClient = buildCacheClient({
backend: this._config.cache.backend,
cache: this._config.cache,
counter: this._config.redis,
});
metricController.setHandlerEnvironment({
warp10Clients,
cacheClient: this._cacheClient,
});
}
static async _createApp(spec) {
const app = express();
app.use(bodyParser.json({ strict: false }));
@ -32,13 +54,13 @@ class UtapiServer extends Process {
return app;
}
static _createHttpsAgent() {
_createHttpsAgent() {
const conf = {
ciphers: ciphers.ciphers,
dhparam,
cert: config.tls.cert,
key: config.tls.key,
ca: config.tls.ca ? [config.tls.ca] : null,
cert: this._config.tls.cert,
key: this._config.tls.key,
ca: this._config.tls.ca ? [this._config.tls.ca] : null,
requestCert: false,
rejectUnauthorized: true,
};
@ -47,36 +69,37 @@ class UtapiServer extends Process {
return conf;
}
static async _createServer(app) {
if (config.tls) {
return https.createServer(UtapiServer._createHttpsAgent(), app);
async _createServer(app) {
if (this._config.tls) {
return https.createServer(this._createHttpsAgent(), app);
}
return http.createServer(app);
}
static async _startServer(server) {
async _startServer(server) {
moduleLogger
.with({
method: 'UtapiServer::_startServer',
cacheBackend: config.cacheBackend,
cacheBackend: this._config.cacheBackend,
})
.info(`Server listening on ${config.port}`);
await server.listen(config.port);
.info(`Server listening on ${this._config.port}`);
await server.listen(this._config.port);
}
async _setup() {
this._setupHandlerEnvironment();
this._app = await UtapiServer._createApp(apiSpec);
this._server = await UtapiServer._createServer(this._app);
this._server = await this._createServer(this._app);
}
async _start() {
await cacheClient.connect();
await UtapiServer._startServer(this._server);
await this._cacheClient.connect();
await this._startServer(this._server);
}
async _join() {
await this._server.close();
await cacheClient.disconnect();
await this._cacheClient.disconnect();
}
}

View File

@ -1,10 +1,10 @@
const assert = require('assert');
const cron = require('node-schedule');
const cronparser = require('cron-parser');
const { client: cacheClient } = require('../cache');
const { buildCacheClient } = require('../cache');
const Process = require('../process');
const { LoggerContext, iterIfError } = require('../utils');
const { buildWarp10Clients } = require('../warp10');
const logger = new LoggerContext({
module: 'BaseTask',
@ -13,18 +13,24 @@ const logger = new LoggerContext({
class Now {}
class BaseTask extends Process {
constructor(options) {
super();
assert.notStrictEqual(options, undefined);
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
this._cache = cacheClient;
this._warp10Clients = options.warp10;
constructor(config) {
super(config);
this._cache = null;
this._warp10Clients = null;
this._scheduler = null;
this._defaultSchedule = Now;
this._defaultLag = 0;
}
async _setup(includeDefaultOpts = true) {
async _setup(config, includeDefaultOpts = true) {
this._nodeId = config.nodeId;
this._cache = buildCacheClient({
backend: config.cache.backend,
cache: config.cache,
counter: config.redis,
});
this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
if (includeDefaultOpts) {
this._program
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
@ -94,7 +100,9 @@ class BaseTask extends Process {
}
async _join() {
return this._cache.disconnect();
if (this._cache) {
await this._cache.disconnect();
}
}
withWarp10(func, onError) {

View File

@ -28,8 +28,8 @@ class MonitorDiskUsage extends BaseTask {
this._hardLimit = config.diskUsage.hardLimit || null;
}
async _setup() {
await super._setup();
async _setup(config) {
await super._setup(config);
this._program
.option('--leader', 'Mark this process as the leader for metric expiration.')
.option(

View File

@ -119,15 +119,26 @@ class Warp10Client {
}
}
const clients = _config.warp10.hosts.map(
val => new Warp10Client({
function buildWarp10Clients(hosts, tokens) {
return hosts.map(
val => new Warp10Client({
...tokens,
...val,
}),
);
}
// TODO Remove after all users have been moved to building their own clients
const clients = buildWarp10Clients(
_config.warp10.hosts,
{
readToken: _config.warp10.readToken,
writeToken: _config.warp10.writeToken,
...val,
}),
},
);
module.exports = {
Warp10Client,
clients,
buildWarp10Clients,
};

View File

@ -2,8 +2,8 @@ const assert = require('assert');
const sinon = require('sinon');
const uuid = require('uuid');
const { clients: warp10Clients } = require('../../../libV2/warp10');
const { MonitorDiskUsage } = require('../../../libV2/tasks');
const config = require('../../../libV2/config');
const { fillDir } = require('../../utils/v2Data');
@ -15,7 +15,7 @@ describe('Test MonitorDiskUsage hard limit', function () {
beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`;
task = new MonitorDiskUsage({ warp10: warp10Clients });
task = new MonitorDiskUsage(config);
await task.setup();
task._path = path;
task._enabled = true;

View File

@ -6,6 +6,7 @@ const { MonitorDiskUsage } = require('../../../libV2/tasks');
const { UtapiMetric } = require('../../../libV2/models');
const { now } = require('../../../libV2/utils');
const { expirationChunkDuration } = require('../../../libV2/constants');
const config = require('../../../libV2/config');
const { fetchRecords } = require('../../utils/v2Data');
@ -15,7 +16,7 @@ describe('Test MonitorDiskUsage soft limit', function () {
let task;
beforeEach(async () => {
task = new MonitorDiskUsage({ warp10: warp10Clients });
task = new MonitorDiskUsage(config);
await task.setup();
task._expirationEnabled = true;
task._program.leader = true;

View File

@ -3,7 +3,7 @@ const async = require('async');
const uuid = require('uuid');
const UtapiClient = require('../../../../libV2/client');
const { clients: warp10Clients } = require('../../../../libV2/warp10');
const { Warp10Client } = require('../../../../libV2/warp10');
const config = require('../../../../libV2/config');
const { CacheClient, backends: cacheBackends } = require('../../../../libV2/cache');
const { IngestShard } = require('../../../../libV2/tasks');
@ -20,7 +20,7 @@ const getClient = () => new CacheClient({
),
});
const warp10 = warp10Clients[0];
const warp10 = new Warp10Client();
// eslint-disable-next-line func-names
describe('Test getStorage handler', function () {
@ -49,9 +49,15 @@ describe('Test getStorage handler', function () {
cacheClient = getClient();
await cacheClient.connect();
ingestTask = new IngestShard({ warp10: [warp10Clients[0]] });
ingestTask = new IngestShard(
config.merge({
warp10: {
hosts: [config.warp10.hosts[0]],
},
}),
);
ingestTask._program = { lag: 0 };
await ingestTask._cache.connect();
await ingestTask.setup();
});
afterEach(async () => {

View File

@ -4,6 +4,7 @@ const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10');
const { convertTimestamp } = require('../../../../libV2/utils');
const { CreateCheckpoint } = require('../../../../libV2/tasks');
const config = require('../../../../libV2/config');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
@ -49,7 +50,8 @@ describe('Test CreateCheckpoint', function () {
prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix });
checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
checkpointTask = new CreateCheckpoint(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await checkpointTask.setup();
checkpointTask._program = { lag: 0, nodeId: prefix };
});

View File

@ -4,6 +4,7 @@ const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10');
const { convertTimestamp } = require('../../../../libV2/utils');
const { CreateCheckpoint, CreateSnapshot, RepairTask } = require('../../../../libV2/tasks');
const config = require('../../../../libV2/config');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
@ -51,13 +52,16 @@ describe('Test CreateSnapshot', function () {
prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix });
checkpointTask = new CreateCheckpoint({ warp10: [warp10] });
checkpointTask = new CreateCheckpoint(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await checkpointTask.setup();
checkpointTask._program = { lag: 0, nodeId: prefix };
snapshotTask = new CreateSnapshot({ warp10: [warp10] });
snapshotTask = new CreateSnapshot(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await snapshotTask.setup();
snapshotTask._program = { lag: 0, nodeId: prefix };
repairTask = new RepairTask({ warp10: [warp10] });
repairTask = new RepairTask(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await repairTask.setup();
repairTask._program = { lag: 0, nodeId: prefix };
});

View File

@ -3,6 +3,7 @@ const uuid = require('uuid');
const { MonitorDiskUsage } = require('../../../../libV2/tasks');
const { getFolderSize } = require('../../../../libV2/utils');
const config = require('../../../../libV2/config');
const { fillDir } = require('../../../utils/v2Data');
@ -43,7 +44,7 @@ describe('Test MonitorDiskUsage', () => {
beforeEach(async () => {
path = `/tmp/diskusage-${uuid.v4()}`;
task = new MonitorDiskUsageShim({ warp10: [] });
task = new MonitorDiskUsageShim(config);
task._path = path;
task._enabled = true;
await task.setup();

View File

@ -58,7 +58,8 @@ describe('Test IngestShards', function () {
await cacheClient.connect();
warp10 = new Warp10Client({ nodeId: prefix });
ingestTask = new IngestShard({ warp10: [warp10] });
ingestTask = new IngestShard(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await ingestTask.setup();
ingestTask._cache._cacheBackend._prefix = prefix;
ingestTask._program = { lag: 0 };
await ingestTask._cache.connect();

View File

@ -4,6 +4,7 @@ const { mpuBucketPrefix } = require('arsenal').constants;
const { Warp10Client } = require('../../../../libV2/warp10');
const { ReindexTask } = require('../../../../libV2/tasks');
const config = require('../../../../libV2/config');
const { now } = require('../../../../libV2/utils');
const { BucketD, values } = require('../../../utils/mock/');
const { fetchRecords } = require('../../../utils/v2Data');
@ -32,10 +33,11 @@ describe('Test ReindexTask', function () {
before(() => bucketd.start());
beforeEach(() => {
beforeEach(async () => {
prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix });
reindexTask = new ReindexTask({ warp10: [warp10] });
reindexTask = new ReindexTask(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await reindexTask.setup();
reindexTask._program = { nodeId: prefix };
});

View File

@ -4,6 +4,7 @@ const uuid = require('uuid');
const { Warp10Client } = require('../../../../libV2/warp10');
const { convertTimestamp } = require('../../../../libV2/utils');
const { RepairTask } = require('../../../../libV2/tasks');
const config = require('../../../../libV2/config');
const { generateCustomEvents, fetchRecords } = require('../../../utils/v2Data');
@ -49,7 +50,8 @@ describe('Test Repair', function () {
beforeEach(async () => {
prefix = uuid.v4();
warp10 = new Warp10Client({ nodeId: prefix });
repairTask = new RepairTask({ warp10: [warp10] });
repairTask = new RepairTask(config.merge({ warp10: { hosts: [{ nodeId: prefix }] } }));
await repairTask.setup();
repairTask._program = { lag: 0, nodeId: prefix };
});

View File

@ -7,7 +7,6 @@ const UtapiClient = require('../../../libV2/client');
const { clients: warp10Clients } = require('../../../libV2/warp10');
const config = require('../../../libV2/config');
const { CacheClient, backends: cacheBackends } = require('../../../libV2/cache');
const { IngestShard } = require('../../../libV2/tasks');
const { now } = require('../../../libV2/utils');
const { generateCustomEvents } = require('../../utils/v2Data');
@ -104,7 +103,6 @@ describe('Test UtapiClient', function () {
let totals;
let cacheClient;
let ingestTask;
beforeEach(async () => {
client = new UtapiClient({
@ -121,10 +119,6 @@ describe('Test UtapiClient', function () {
cacheClient = getClient();
await cacheClient.connect();
ingestTask = new IngestShard({ warp10: [warp10] });
ingestTask._program = { lag: 0 };
await ingestTask._cache.connect();
});
it('should get the current storage for an account with a empty cache', async () => {

View File

@ -1,12 +1,15 @@
const assert = require('assert');
const sinon = require('sinon');
const ingestMetric = require('../../../../../libV2/server/API/metrics/ingestMetric');
const { client: cacheClient } = require('../../../../../libV2/cache');
const { buildCacheClient } = require('../../../../../libV2/cache');
const { convertTimestamp } = require('../../../../../libV2/utils');
const { UtapiMetric } = require('../../../../../libV2/models');
const { generateFakeEvents, templateContext } = require('../../../../utils/v2Data');
const events = generateFakeEvents(1, 50, 50);
const cacheClient = buildCacheClient({ backend: 'memory' });
const _ingestMetric = ingestMetric.bind({ cacheClient });
describe('Test ingestMetric', () => {
let ctx;
@ -18,7 +21,7 @@ describe('Test ingestMetric', () => {
it('should ingest metrics', async () => {
const spy = sinon.spy(cacheClient, 'pushMetric');
await ingestMetric(ctx, { body: events.map(ev => ev.getValue()) });
await _ingestMetric(ctx, { body: events.map(ev => ev.getValue()) });
assert.strictEqual(ctx.results.statusCode, 200);
events.forEach(ev => {
assert(spy.calledWith(
@ -32,14 +35,14 @@ describe('Test ingestMetric', () => {
it('should throw InvalidRequest if metric data is invalid',
() => assert.rejects(
ingestMetric(ctx, { body: [{ operationId: 'invalid' }] }),
_ingestMetric(ctx, { body: [{ operationId: 'invalid' }] }),
err => err.code === 400 && err.InvalidRequest,
));
it('should throw ServiceUnavailable if the cache client encounters an error', () => {
sinon.stub(cacheClient, 'pushMetric').rejects();
return assert.rejects(
ingestMetric(ctx, { body: events.map(ev => ev.getValue()) }),
_ingestMetric(ctx, { body: events.map(ev => ev.getValue()) }),
err => err.code === 503 && err.ServiceUnavailable,
);
});
@ -49,7 +52,7 @@ describe('Test ingestMetric', () => {
const metric = new UtapiMetric({
operationId: 'putDeleteMarkerObject',
});
await ingestMetric(ctx, { body: [metric.getValue()] });
await _ingestMetric(ctx, { body: [metric.getValue()] });
assert.strictEqual(ctx.results.statusCode, 200);
assert(spy.calledWith(
new UtapiMetric({