Compare commits
1 Commits
developmen
...
feature/un
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 6628962b6b |
|
@ -0,0 +1,25 @@
|
||||||
|
#!/usr/bin/env node
|
||||||
|
const Process = require('../libV2/process');
|
||||||
|
|
||||||
|
const { LoggerContext } = require('../libV2/utils');
|
||||||
|
|
||||||
|
const logger = new LoggerContext({ module: 'entrypoint' });
|
||||||
|
|
||||||
|
const utapiProcess = new Process();
|
||||||
|
|
||||||
|
utapiProcess.setup().then(
|
||||||
|
() => {
|
||||||
|
logger.info('Utapi setup completed, starting...');
|
||||||
|
return utapiProcess.start();
|
||||||
|
},
|
||||||
|
async error => {
|
||||||
|
logger.error(`Utapi encountered an unexpected error during setup ${ error.message }`, );
|
||||||
|
await utapiProcess.join(1);
|
||||||
|
},
|
||||||
|
).then(
|
||||||
|
() => logger.info('Utapi started'),
|
||||||
|
async error => {
|
||||||
|
logger.error(`Utapi encountered an error during startup: ${error.message}`);
|
||||||
|
await utapiProcess.join(1);
|
||||||
|
},
|
||||||
|
);
|
|
@ -12,9 +12,12 @@ function orNull(value) {
|
||||||
return value === undefined ? null : value;
|
return value === undefined ? null : value;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
const _multiprocessDB = {};
|
||||||
|
|
||||||
class MemoryCache {
|
class MemoryCache {
|
||||||
constructor() {
|
constructor(config) {
|
||||||
this._data = {};
|
this._data = config && config.multiprocess ? _multiprocessDB : {};
|
||||||
this._shards = {};
|
this._shards = {};
|
||||||
this._prefix = 'utapi';
|
this._prefix = 'utapi';
|
||||||
this._expirations = {};
|
this._expirations = {};
|
||||||
|
|
|
@ -8,6 +8,15 @@ const cacheTypes = {
|
||||||
memory: () => new MemoryCache(),
|
memory: () => new MemoryCache(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
function buildCacheClient(cacheConfig) {
|
||||||
|
const { backend, counter, cache } = cacheConfig;
|
||||||
|
return new CacheClient({
|
||||||
|
cacheBackend: cacheTypes[backend](cache),
|
||||||
|
counterBackend: cacheTypes[backend](counter),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO remove after all users have been moved to buildCacheClient
|
||||||
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
|
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
|
||||||
const counterBackend = cacheTypes[config.cache.backend](config.redis);
|
const counterBackend = cacheTypes[config.cache.backend](config.redis);
|
||||||
|
|
||||||
|
@ -17,5 +26,7 @@ module.exports = {
|
||||||
MemoryCache,
|
MemoryCache,
|
||||||
RedisCache,
|
RedisCache,
|
||||||
},
|
},
|
||||||
|
// TODO remove after all users have been moved to buildCacheClient
|
||||||
client: new CacheClient({ cacheBackend, counterBackend }),
|
client: new CacheClient({ cacheBackend, counterBackend }),
|
||||||
|
buildCacheClient,
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
const { Command } = require('commander');
|
||||||
|
const _config = require('./config');
|
||||||
|
const { comprehend } = require('./utils');
|
||||||
|
const { logger } = require('./utils');
|
||||||
|
const availableSubsystems = [
|
||||||
|
'server',
|
||||||
|
'ingest',
|
||||||
|
'checkpoint',
|
||||||
|
'snapshot',
|
||||||
|
'repair',
|
||||||
|
'reindex',
|
||||||
|
'limit',
|
||||||
|
];
|
||||||
|
|
||||||
|
const enabledSubsystems = [];
|
||||||
|
|
||||||
|
const cli = new Command()
|
||||||
|
.option('--server', 'Start a Utapi metrics server', () =>
|
||||||
|
enabledSubsystems.push('server') && true)
|
||||||
|
.option('--ingest', 'Start the ingest task', () =>
|
||||||
|
enabledSubsystems.push('ingest') && true)
|
||||||
|
.option(
|
||||||
|
'--checkpoint',
|
||||||
|
'Start the checkpoint task scheduler',
|
||||||
|
() => enabledSubsystems.push('checkpoint') && true,
|
||||||
|
)
|
||||||
|
.option('--snapshot', 'Start the snapshot task scheduler', () =>
|
||||||
|
enabledSubsystems.push('snapshot') && true)
|
||||||
|
.option('--repair', 'Start the repair task scheduler', () =>
|
||||||
|
enabledSubsystems.push('repair') && true)
|
||||||
|
.option('--reindex', 'Start the reindex task scheduler', () =>
|
||||||
|
enabledSubsystems.push('reindex') && true)
|
||||||
|
.option(
|
||||||
|
'--now',
|
||||||
|
'Ignore configured schedules and execute specified background tasks immediately, exiting afterward.\n'
|
||||||
|
+ 'Can not be used with --server.',
|
||||||
|
);
|
||||||
|
|
||||||
|
class UtapiCLI {
|
||||||
|
static _parseSubsystems(config) {
|
||||||
|
return availableSubsystems.filter(sys => config[sys]);
|
||||||
|
// return comprehend(subsystems, (_, key) => ({
|
||||||
|
// key,
|
||||||
|
// value: !!config[key],
|
||||||
|
// }));
|
||||||
|
}
|
||||||
|
|
||||||
|
static parse(argv) {
|
||||||
|
const parsed = cli.parse(argv);
|
||||||
|
|
||||||
|
if (parsed.now && enabledSubsystems.includes('server')) {
|
||||||
|
throw new Error('--now can not be used with --server');
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(parsed)
|
||||||
|
const cliConfig = {
|
||||||
|
subsystems: UtapiCLI._parseSubsystems(parsed),
|
||||||
|
};
|
||||||
|
|
||||||
|
return _config.merge(cliConfig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = UtapiCLI;
|
|
@ -350,7 +350,7 @@ class Config {
|
||||||
* @param {object} newConfig - an object using the same structure as the config file
|
* @param {object} newConfig - an object using the same structure as the config file
|
||||||
* @returns {Config} - New Config instance
|
* @returns {Config} - New Config instance
|
||||||
*/
|
*/
|
||||||
static merge(newConfig) {
|
merge(newConfig) {
|
||||||
return new Config(newConfig);
|
return new Config(newConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
114
libV2/process.js
114
libV2/process.js
|
@ -1,16 +1,39 @@
|
||||||
const { EventEmitter } = require('events');
|
const { EventEmitter } = require('events');
|
||||||
const os = require('os');
|
const os = require('os');
|
||||||
const { Command } = require('commander');
|
const async = require('async');
|
||||||
const { logger } = require('./utils');
|
const { logger, comprehend } = require('./utils');
|
||||||
|
const UtapiCLI = require('./cli');
|
||||||
|
const { UtapiServer } = require('./server');
|
||||||
|
const {
|
||||||
|
IngestShard,
|
||||||
|
CreateCheckpoint,
|
||||||
|
CreateSnapshot,
|
||||||
|
RepairTask,
|
||||||
|
ReindexTask,
|
||||||
|
MonitorDiskUsage,
|
||||||
|
} = require('./tasks');
|
||||||
|
|
||||||
|
const subsystems = {
|
||||||
|
server: UtapiServer,
|
||||||
|
ingest: IngestShard,
|
||||||
|
checkpoint: CreateCheckpoint,
|
||||||
|
snapshot: CreateSnapshot,
|
||||||
|
repair: RepairTask,
|
||||||
|
reindex: ReindexTask,
|
||||||
|
limit: MonitorDiskUsage,
|
||||||
|
// TODO split expiration into separate task
|
||||||
|
// expiration:
|
||||||
|
};
|
||||||
|
|
||||||
class Process extends EventEmitter {
|
class Process extends EventEmitter {
|
||||||
constructor(...options) {
|
constructor() {
|
||||||
super(...options);
|
super();
|
||||||
this._program = null;
|
this._config = null;
|
||||||
|
this._subsystems = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
async setup() {
|
_registerSignalHandlers() {
|
||||||
const cleanUpFunc = this.join.bind(this);
|
const cleanUpFunc = this.join.bind(this, 1);
|
||||||
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
|
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
|
||||||
process.on(eventName, cleanUpFunc);
|
process.on(eventName, cleanUpFunc);
|
||||||
});
|
});
|
||||||
|
@ -19,27 +42,78 @@ class Process extends EventEmitter {
|
||||||
{ error, stack: error.stack.split(os.EOL) });
|
{ error, stack: error.stack.split(os.EOL) });
|
||||||
cleanUpFunc();
|
cleanUpFunc();
|
||||||
});
|
});
|
||||||
this._program = new Command();
|
}
|
||||||
await this._setup();
|
|
||||||
|
async setup() {
|
||||||
|
this._registerSignalHandlers();
|
||||||
|
try {
|
||||||
|
this._config = UtapiCLI.parse(process.argv);
|
||||||
|
} catch(error) {
|
||||||
|
console.log(error.message);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// console.log(this._config)
|
||||||
|
this._subsystems = await Process._setupSubSystems(this._config);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static async _setupSubSystems(config) {
|
||||||
|
return async.reduce(config.subsystems, {},
|
||||||
|
// const systems = comprehend(
|
||||||
|
// config.subsystems,
|
||||||
|
async (systems, key) => {
|
||||||
|
const sys = new subsystems[key](config);
|
||||||
|
await sys.setup();
|
||||||
|
systems[key] = sys;
|
||||||
|
return systems;
|
||||||
|
},
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async start() {
|
async start() {
|
||||||
this._program.parse(process.argv);
|
if (!this._subsystems) {
|
||||||
await this._start();
|
throw new Error('The process must be setup before starting!');
|
||||||
|
}
|
||||||
|
// console.log(this._subsystems)
|
||||||
|
await Promise.all(
|
||||||
|
Object.entries(this._subsystems).map(async ([name, sys]) => {
|
||||||
|
try {
|
||||||
|
await sys.start();
|
||||||
|
} catch (error) {
|
||||||
|
const msg = `Error starting subsystem ${name}`;
|
||||||
|
logger.error(msg, { error });
|
||||||
|
throw new Error(msg);
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async join() {
|
async join(returnCode = 0) {
|
||||||
this.emit('exit');
|
this.emit('exit');
|
||||||
await this._join();
|
console.log('-'.repeat(50))
|
||||||
|
if (this._subsystems) {
|
||||||
|
const results = await Promise.all(
|
||||||
|
Object.entries(this._subsystems).map(async ([name, sys]) => {
|
||||||
|
try {
|
||||||
|
await sys.join();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Error stopping subsystem ${name}`, { error });
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
const errors = results.filter(e => e !== null);
|
||||||
|
if (errors.length) {
|
||||||
|
logger.error(`Error stopping subsystems: ${errors.join(', ')}`, { subsystems: errors });
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* eslint-disable class-methods-use-this,no-empty-function */
|
process.exit(returnCode);
|
||||||
async _setup() {}
|
|
||||||
|
|
||||||
async _start() {}
|
|
||||||
|
|
||||||
async _join() {}
|
|
||||||
/* eslint-enable class-methods-use-this,no-empty-function */
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
module.exports = Process;
|
module.exports = Process;
|
||||||
|
|
|
@ -4,8 +4,9 @@ const express = require('express');
|
||||||
const bodyParser = require('body-parser');
|
const bodyParser = require('body-parser');
|
||||||
const { ciphers, dhparam } = require('arsenal').https;
|
const { ciphers, dhparam } = require('arsenal').https;
|
||||||
|
|
||||||
const Process = require('../process');
|
const SubSystem = require('../subsystem');
|
||||||
const config = require('../config');
|
|
||||||
|
// const config = require('../config');
|
||||||
const { initializeOasTools, middleware } = require('./middleware');
|
const { initializeOasTools, middleware } = require('./middleware');
|
||||||
const { spec: apiSpec } = require('./spec');
|
const { spec: apiSpec } = require('./spec');
|
||||||
const { client: cacheClient } = require('../cache');
|
const { client: cacheClient } = require('../cache');
|
||||||
|
@ -15,11 +16,12 @@ const moduleLogger = new LoggerContext({
|
||||||
module: 'server',
|
module: 'server',
|
||||||
});
|
});
|
||||||
|
|
||||||
class UtapiServer extends Process {
|
class UtapiServer extends SubSystem {
|
||||||
constructor() {
|
constructor(config) {
|
||||||
super();
|
super();
|
||||||
this._app = null;
|
this._app = null;
|
||||||
this._server = null;
|
this._server = null;
|
||||||
|
this._config = config;
|
||||||
}
|
}
|
||||||
|
|
||||||
static async _createApp(spec) {
|
static async _createApp(spec) {
|
||||||
|
@ -32,13 +34,13 @@ class UtapiServer extends Process {
|
||||||
return app;
|
return app;
|
||||||
}
|
}
|
||||||
|
|
||||||
static _createHttpsAgent() {
|
_createHttpsAgent() {
|
||||||
const conf = {
|
const conf = {
|
||||||
ciphers: ciphers.ciphers,
|
ciphers: ciphers.ciphers,
|
||||||
dhparam,
|
dhparam,
|
||||||
cert: config.tls.cert,
|
cert: this._config.tls.cert,
|
||||||
key: config.tls.key,
|
key: this._config.tls.key,
|
||||||
ca: config.tls.ca ? [config.tls.ca] : null,
|
ca: this._config.tls.ca ? [this._config.tls.ca] : null,
|
||||||
requestCert: false,
|
requestCert: false,
|
||||||
rejectUnauthorized: true,
|
rejectUnauthorized: true,
|
||||||
};
|
};
|
||||||
|
@ -47,31 +49,31 @@ class UtapiServer extends Process {
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static async _createServer(app) {
|
async _createServer(app) {
|
||||||
if (config.tls) {
|
if (this._config.tls) {
|
||||||
return https.createServer(UtapiServer._createHttpsAgent(), app);
|
return https.createServer(UtapiServer._createHttpsAgent(), app);
|
||||||
}
|
}
|
||||||
return http.createServer(app);
|
return http.createServer(app);
|
||||||
}
|
}
|
||||||
|
|
||||||
static async _startServer(server) {
|
async _startServer(server) {
|
||||||
moduleLogger
|
moduleLogger
|
||||||
.with({
|
.with({
|
||||||
method: 'UtapiServer::_startServer',
|
method: 'UtapiServer::_startServer',
|
||||||
cacheBackend: config.cacheBackend,
|
cacheBackend: this._config.cacheBackend,
|
||||||
})
|
})
|
||||||
.info(`Server listening on ${config.port}`);
|
.info(`Server listening on ${this._config.port}`);
|
||||||
await server.listen(config.port);
|
await server.listen(this._config.port);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _setup() {
|
async _setup() {
|
||||||
this._app = await UtapiServer._createApp(apiSpec);
|
this._app = await UtapiServer._createApp(apiSpec);
|
||||||
this._server = await UtapiServer._createServer(this._app);
|
this._server = await this._createServer(this._app);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _start() {
|
async _start() {
|
||||||
await cacheClient.connect();
|
await cacheClient.connect();
|
||||||
await UtapiServer._startServer(this._server);
|
await this._startServer(this._server);
|
||||||
}
|
}
|
||||||
|
|
||||||
async _join() {
|
async _join() {
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
const { EventEmitter } = require('events');
|
||||||
|
|
||||||
|
class SubSystem extends EventEmitter {
|
||||||
|
constructor(config) {
|
||||||
|
super();
|
||||||
|
this._config = config;
|
||||||
|
}
|
||||||
|
|
||||||
|
async setup() {
|
||||||
|
await this._setup(this._config);
|
||||||
|
}
|
||||||
|
|
||||||
|
async start() {
|
||||||
|
await this._start();
|
||||||
|
}
|
||||||
|
|
||||||
|
async join() {
|
||||||
|
this.emit('exit');
|
||||||
|
await this._join();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* eslint-disable class-methods-use-this,no-empty-function */
|
||||||
|
async _setup() {}
|
||||||
|
|
||||||
|
async _start() {}
|
||||||
|
|
||||||
|
async _join() {}
|
||||||
|
/* eslint-enable class-methods-use-this,no-empty-function */
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = SubSystem;
|
|
@ -2,9 +2,11 @@ const assert = require('assert');
|
||||||
const cron = require('node-schedule');
|
const cron = require('node-schedule');
|
||||||
const cronparser = require('cron-parser');
|
const cronparser = require('cron-parser');
|
||||||
|
|
||||||
const { client: cacheClient } = require('../cache');
|
const { client: cacheClient, buildCacheClient } = require('../cache');
|
||||||
const Process = require('../process');
|
const SubSystem = require('../subsystem');
|
||||||
|
|
||||||
const { LoggerContext, iterIfError } = require('../utils');
|
const { LoggerContext, iterIfError } = require('../utils');
|
||||||
|
const { buildWarp10Clients } = require('../warp10');
|
||||||
|
|
||||||
const logger = new LoggerContext({
|
const logger = new LoggerContext({
|
||||||
module: 'BaseTask',
|
module: 'BaseTask',
|
||||||
|
@ -12,49 +14,51 @@ const logger = new LoggerContext({
|
||||||
|
|
||||||
class Now {}
|
class Now {}
|
||||||
|
|
||||||
class BaseTask extends Process {
|
class BaseTask extends SubSystem {
|
||||||
constructor(options) {
|
constructor(config) {
|
||||||
super();
|
super(config);
|
||||||
assert.notStrictEqual(options, undefined);
|
// TODO construct cache client here rather than globally
|
||||||
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
|
this._cache = null;
|
||||||
this._cache = cacheClient;
|
this._warp10Clients = null;
|
||||||
this._warp10Clients = options.warp10;
|
|
||||||
this._scheduler = null;
|
this._scheduler = null;
|
||||||
this._defaultSchedule = Now;
|
this._defaultSchedule = Now;
|
||||||
this._defaultLag = 0;
|
this._defaultLag = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
async _setup(includeDefaultOpts = true) {
|
async _setup(config) {
|
||||||
if (includeDefaultOpts) {
|
this._nodeId = config.nodeId;
|
||||||
this._program
|
this._cache = buildCacheClient(config.cache);
|
||||||
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
|
this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
|
||||||
.option(
|
// if (includeDefaultOpts) {
|
||||||
'-s, --schedule <crontab>',
|
// this._program
|
||||||
'Execute task using this crontab. Overrides configured schedule',
|
// .option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
|
||||||
value => {
|
// .option(
|
||||||
cronparser.parseExpression(value);
|
// '-s, --schedule <crontab>',
|
||||||
return value;
|
// 'Execute task using this crontab. Overrides configured schedule',
|
||||||
},
|
// value => {
|
||||||
)
|
// cronparser.parseExpression(value);
|
||||||
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
// return value;
|
||||||
.option('-n, --node-id <id>', 'Set a custom node id');
|
// },
|
||||||
}
|
// )
|
||||||
|
// .option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
||||||
|
// .option('-n, --node-id <id>', 'Set a custom node id');
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
get schedule() {
|
get schedule() {
|
||||||
if (this._program.now) {
|
// if (this._program.now) {
|
||||||
return Now;
|
// return Now;
|
||||||
}
|
// }
|
||||||
if (this._program.schedule) {
|
// if (this._program.schedule) {
|
||||||
return this._program.schedule;
|
// return this._program.schedule;
|
||||||
}
|
// }
|
||||||
return this._defaultSchedule;
|
return this._defaultSchedule;
|
||||||
}
|
}
|
||||||
|
|
||||||
get lag() {
|
get lag() {
|
||||||
if (this._program.lag !== undefined) {
|
// if (this._program.lag !== undefined) {
|
||||||
return this._program.lag;
|
// return this._program.lag;
|
||||||
}
|
// }
|
||||||
return this._defaultLag;
|
return this._defaultLag;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,7 @@ const assert = require('assert');
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
const BaseTask = require('./BaseTask');
|
const BaseTask = require('./BaseTask');
|
||||||
const { UtapiMetric } = require('../models');
|
const { UtapiMetric } = require('../models');
|
||||||
const config = require('../config');
|
// const config = require('../config');
|
||||||
const { checkpointLagSecs } = require('../constants');
|
const { checkpointLagSecs } = require('../constants');
|
||||||
const {
|
const {
|
||||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
||||||
|
@ -15,11 +15,15 @@ const logger = new LoggerContext({
|
||||||
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||||
|
|
||||||
class IngestShardTask extends BaseTask {
|
class IngestShardTask extends BaseTask {
|
||||||
constructor(options) {
|
constructor(config, stripEventUUID = true) {
|
||||||
super(options);
|
super(config);
|
||||||
|
this._stripEventUUID = stripEventUUID;
|
||||||
|
}
|
||||||
|
|
||||||
|
async _setup(config) {
|
||||||
|
await super._setup(config);
|
||||||
this._defaultSchedule = config.ingestionSchedule;
|
this._defaultSchedule = config.ingestionSchedule;
|
||||||
this._defaultLag = config.ingestionLagSeconds;
|
this._defaultLag = config.ingestionLagSeconds;
|
||||||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_hydrateEvent(data, stripTimestamp = false) {
|
_hydrateEvent(data, stripTimestamp = false) {
|
||||||
|
@ -77,13 +81,13 @@ class IngestShardTask extends BaseTask {
|
||||||
return warp10.ingest(
|
return warp10.ingest(
|
||||||
{
|
{
|
||||||
className: metricClass,
|
className: metricClass,
|
||||||
labels: { origin: config.nodeId },
|
labels: { origin: this._nodeId },
|
||||||
}, records,
|
}, records,
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
assert.strictEqual(status, records.length);
|
assert.strictEqual(status, records.length);
|
||||||
await this._cache.deleteShard(shard);
|
await this._cache.deleteShard(shard);
|
||||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
logger.info(`ingested ${status} records from ${this._nodeId} into ${ingestedIntoNodeId}`);
|
||||||
} else {
|
} else {
|
||||||
logger.debug('No events found in shard, cleaning up');
|
logger.debug('No events found in shard, cleaning up');
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,15 +119,22 @@ class Warp10Client {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const clients = _config.warp10.hosts.map(
|
function buildWarp10Clients(hosts) {
|
||||||
|
return hosts.map(
|
||||||
val => new Warp10Client({
|
val => new Warp10Client({
|
||||||
readToken: _config.warp10.readToken,
|
readToken: _config.warp10.readToken,
|
||||||
writeToken: _config.warp10.writeToken,
|
writeToken: _config.warp10.writeToken,
|
||||||
...val,
|
...val,
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// TODO Remove after all users have been moved to building their own clients
|
||||||
|
const clients = buildWarp10Clients(_config.warp10.hosts);
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
Warp10Client,
|
Warp10Client,
|
||||||
clients,
|
clients,
|
||||||
|
buildWarp10Clients,
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue