Compare commits
1 Commits
developmen
...
feature/un
Author | SHA1 | Date |
---|---|---|
Taylor McKinnon | 6628962b6b |
|
@ -0,0 +1,25 @@
|
|||
#!/usr/bin/env node
|
||||
const Process = require('../libV2/process');
|
||||
|
||||
const { LoggerContext } = require('../libV2/utils');
|
||||
|
||||
const logger = new LoggerContext({ module: 'entrypoint' });
|
||||
|
||||
const utapiProcess = new Process();
|
||||
|
||||
utapiProcess.setup().then(
|
||||
() => {
|
||||
logger.info('Utapi setup completed, starting...');
|
||||
return utapiProcess.start();
|
||||
},
|
||||
async error => {
|
||||
logger.error(`Utapi encountered an unexpected error during setup ${ error.message }`, );
|
||||
await utapiProcess.join(1);
|
||||
},
|
||||
).then(
|
||||
() => logger.info('Utapi started'),
|
||||
async error => {
|
||||
logger.error(`Utapi encountered an error during startup: ${error.message}`);
|
||||
await utapiProcess.join(1);
|
||||
},
|
||||
);
|
|
@ -12,9 +12,12 @@ function orNull(value) {
|
|||
return value === undefined ? null : value;
|
||||
}
|
||||
|
||||
|
||||
const _multiprocessDB = {};
|
||||
|
||||
class MemoryCache {
|
||||
constructor() {
|
||||
this._data = {};
|
||||
constructor(config) {
|
||||
this._data = config && config.multiprocess ? _multiprocessDB : {};
|
||||
this._shards = {};
|
||||
this._prefix = 'utapi';
|
||||
this._expirations = {};
|
||||
|
|
|
@ -8,6 +8,15 @@ const cacheTypes = {
|
|||
memory: () => new MemoryCache(),
|
||||
};
|
||||
|
||||
function buildCacheClient(cacheConfig) {
|
||||
const { backend, counter, cache } = cacheConfig;
|
||||
return new CacheClient({
|
||||
cacheBackend: cacheTypes[backend](cache),
|
||||
counterBackend: cacheTypes[backend](counter),
|
||||
});
|
||||
}
|
||||
|
||||
// TODO remove after all users have been moved to buildCacheClient
|
||||
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
|
||||
const counterBackend = cacheTypes[config.cache.backend](config.redis);
|
||||
|
||||
|
@ -17,5 +26,7 @@ module.exports = {
|
|||
MemoryCache,
|
||||
RedisCache,
|
||||
},
|
||||
// TODO remove after all users have been moved to buildCacheClient
|
||||
client: new CacheClient({ cacheBackend, counterBackend }),
|
||||
buildCacheClient,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
const { Command } = require('commander');
|
||||
const _config = require('./config');
|
||||
const { comprehend } = require('./utils');
|
||||
const { logger } = require('./utils');
|
||||
const availableSubsystems = [
|
||||
'server',
|
||||
'ingest',
|
||||
'checkpoint',
|
||||
'snapshot',
|
||||
'repair',
|
||||
'reindex',
|
||||
'limit',
|
||||
];
|
||||
|
||||
const enabledSubsystems = [];
|
||||
|
||||
const cli = new Command()
|
||||
.option('--server', 'Start a Utapi metrics server', () =>
|
||||
enabledSubsystems.push('server') && true)
|
||||
.option('--ingest', 'Start the ingest task', () =>
|
||||
enabledSubsystems.push('ingest') && true)
|
||||
.option(
|
||||
'--checkpoint',
|
||||
'Start the checkpoint task scheduler',
|
||||
() => enabledSubsystems.push('checkpoint') && true,
|
||||
)
|
||||
.option('--snapshot', 'Start the snapshot task scheduler', () =>
|
||||
enabledSubsystems.push('snapshot') && true)
|
||||
.option('--repair', 'Start the repair task scheduler', () =>
|
||||
enabledSubsystems.push('repair') && true)
|
||||
.option('--reindex', 'Start the reindex task scheduler', () =>
|
||||
enabledSubsystems.push('reindex') && true)
|
||||
.option(
|
||||
'--now',
|
||||
'Ignore configured schedules and execute specified background tasks immediately, exiting afterward.\n'
|
||||
+ 'Can not be used with --server.',
|
||||
);
|
||||
|
||||
class UtapiCLI {
|
||||
static _parseSubsystems(config) {
|
||||
return availableSubsystems.filter(sys => config[sys]);
|
||||
// return comprehend(subsystems, (_, key) => ({
|
||||
// key,
|
||||
// value: !!config[key],
|
||||
// }));
|
||||
}
|
||||
|
||||
static parse(argv) {
|
||||
const parsed = cli.parse(argv);
|
||||
|
||||
if (parsed.now && enabledSubsystems.includes('server')) {
|
||||
throw new Error('--now can not be used with --server');
|
||||
}
|
||||
|
||||
console.log(parsed)
|
||||
const cliConfig = {
|
||||
subsystems: UtapiCLI._parseSubsystems(parsed),
|
||||
};
|
||||
|
||||
return _config.merge(cliConfig);
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = UtapiCLI;
|
|
@ -350,7 +350,7 @@ class Config {
|
|||
* @param {object} newConfig - an object using the same structure as the config file
|
||||
* @returns {Config} - New Config instance
|
||||
*/
|
||||
static merge(newConfig) {
|
||||
merge(newConfig) {
|
||||
return new Config(newConfig);
|
||||
}
|
||||
}
|
||||
|
|
116
libV2/process.js
116
libV2/process.js
|
@ -1,16 +1,39 @@
|
|||
const { EventEmitter } = require('events');
|
||||
const os = require('os');
|
||||
const { Command } = require('commander');
|
||||
const { logger } = require('./utils');
|
||||
const async = require('async');
|
||||
const { logger, comprehend } = require('./utils');
|
||||
const UtapiCLI = require('./cli');
|
||||
const { UtapiServer } = require('./server');
|
||||
const {
|
||||
IngestShard,
|
||||
CreateCheckpoint,
|
||||
CreateSnapshot,
|
||||
RepairTask,
|
||||
ReindexTask,
|
||||
MonitorDiskUsage,
|
||||
} = require('./tasks');
|
||||
|
||||
const subsystems = {
|
||||
server: UtapiServer,
|
||||
ingest: IngestShard,
|
||||
checkpoint: CreateCheckpoint,
|
||||
snapshot: CreateSnapshot,
|
||||
repair: RepairTask,
|
||||
reindex: ReindexTask,
|
||||
limit: MonitorDiskUsage,
|
||||
// TODO split expiration into separate task
|
||||
// expiration:
|
||||
};
|
||||
|
||||
class Process extends EventEmitter {
|
||||
constructor(...options) {
|
||||
super(...options);
|
||||
this._program = null;
|
||||
constructor() {
|
||||
super();
|
||||
this._config = null;
|
||||
this._subsystems = null;
|
||||
}
|
||||
|
||||
async setup() {
|
||||
const cleanUpFunc = this.join.bind(this);
|
||||
_registerSignalHandlers() {
|
||||
const cleanUpFunc = this.join.bind(this, 1);
|
||||
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
|
||||
process.on(eventName, cleanUpFunc);
|
||||
});
|
||||
|
@ -19,27 +42,78 @@ class Process extends EventEmitter {
|
|||
{ error, stack: error.stack.split(os.EOL) });
|
||||
cleanUpFunc();
|
||||
});
|
||||
this._program = new Command();
|
||||
await this._setup();
|
||||
}
|
||||
|
||||
async setup() {
|
||||
this._registerSignalHandlers();
|
||||
try {
|
||||
this._config = UtapiCLI.parse(process.argv);
|
||||
} catch(error) {
|
||||
console.log(error.message);
|
||||
return false;
|
||||
}
|
||||
// console.log(this._config)
|
||||
this._subsystems = await Process._setupSubSystems(this._config);
|
||||
return true;
|
||||
}
|
||||
|
||||
static async _setupSubSystems(config) {
|
||||
return async.reduce(config.subsystems, {},
|
||||
// const systems = comprehend(
|
||||
// config.subsystems,
|
||||
async (systems, key) => {
|
||||
const sys = new subsystems[key](config);
|
||||
await sys.setup();
|
||||
systems[key] = sys;
|
||||
return systems;
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
async start() {
|
||||
this._program.parse(process.argv);
|
||||
await this._start();
|
||||
if (!this._subsystems) {
|
||||
throw new Error('The process must be setup before starting!');
|
||||
}
|
||||
// console.log(this._subsystems)
|
||||
await Promise.all(
|
||||
Object.entries(this._subsystems).map(async ([name, sys]) => {
|
||||
try {
|
||||
await sys.start();
|
||||
} catch (error) {
|
||||
const msg = `Error starting subsystem ${name}`;
|
||||
logger.error(msg, { error });
|
||||
throw new Error(msg);
|
||||
}
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
async join() {
|
||||
async join(returnCode = 0) {
|
||||
this.emit('exit');
|
||||
await this._join();
|
||||
console.log('-'.repeat(50))
|
||||
if (this._subsystems) {
|
||||
const results = await Promise.all(
|
||||
Object.entries(this._subsystems).map(async ([name, sys]) => {
|
||||
try {
|
||||
await sys.join();
|
||||
} catch (error) {
|
||||
logger.error(`Error stopping subsystem ${name}`, { error });
|
||||
return name;
|
||||
}
|
||||
return null;
|
||||
}),
|
||||
);
|
||||
|
||||
const errors = results.filter(e => e !== null);
|
||||
if (errors.length) {
|
||||
logger.error(`Error stopping subsystems: ${errors.join(', ')}`, { subsystems: errors });
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
process.exit(returnCode);
|
||||
}
|
||||
|
||||
/* eslint-disable class-methods-use-this,no-empty-function */
|
||||
async _setup() {}
|
||||
|
||||
async _start() {}
|
||||
|
||||
async _join() {}
|
||||
/* eslint-enable class-methods-use-this,no-empty-function */
|
||||
}
|
||||
|
||||
|
||||
module.exports = Process;
|
||||
|
|
|
@ -4,8 +4,9 @@ const express = require('express');
|
|||
const bodyParser = require('body-parser');
|
||||
const { ciphers, dhparam } = require('arsenal').https;
|
||||
|
||||
const Process = require('../process');
|
||||
const config = require('../config');
|
||||
const SubSystem = require('../subsystem');
|
||||
|
||||
// const config = require('../config');
|
||||
const { initializeOasTools, middleware } = require('./middleware');
|
||||
const { spec: apiSpec } = require('./spec');
|
||||
const { client: cacheClient } = require('../cache');
|
||||
|
@ -15,11 +16,12 @@ const moduleLogger = new LoggerContext({
|
|||
module: 'server',
|
||||
});
|
||||
|
||||
class UtapiServer extends Process {
|
||||
constructor() {
|
||||
class UtapiServer extends SubSystem {
|
||||
constructor(config) {
|
||||
super();
|
||||
this._app = null;
|
||||
this._server = null;
|
||||
this._config = config;
|
||||
}
|
||||
|
||||
static async _createApp(spec) {
|
||||
|
@ -32,13 +34,13 @@ class UtapiServer extends Process {
|
|||
return app;
|
||||
}
|
||||
|
||||
static _createHttpsAgent() {
|
||||
_createHttpsAgent() {
|
||||
const conf = {
|
||||
ciphers: ciphers.ciphers,
|
||||
dhparam,
|
||||
cert: config.tls.cert,
|
||||
key: config.tls.key,
|
||||
ca: config.tls.ca ? [config.tls.ca] : null,
|
||||
cert: this._config.tls.cert,
|
||||
key: this._config.tls.key,
|
||||
ca: this._config.tls.ca ? [this._config.tls.ca] : null,
|
||||
requestCert: false,
|
||||
rejectUnauthorized: true,
|
||||
};
|
||||
|
@ -47,31 +49,31 @@ class UtapiServer extends Process {
|
|||
return conf;
|
||||
}
|
||||
|
||||
static async _createServer(app) {
|
||||
if (config.tls) {
|
||||
async _createServer(app) {
|
||||
if (this._config.tls) {
|
||||
return https.createServer(UtapiServer._createHttpsAgent(), app);
|
||||
}
|
||||
return http.createServer(app);
|
||||
}
|
||||
|
||||
static async _startServer(server) {
|
||||
async _startServer(server) {
|
||||
moduleLogger
|
||||
.with({
|
||||
method: 'UtapiServer::_startServer',
|
||||
cacheBackend: config.cacheBackend,
|
||||
cacheBackend: this._config.cacheBackend,
|
||||
})
|
||||
.info(`Server listening on ${config.port}`);
|
||||
await server.listen(config.port);
|
||||
.info(`Server listening on ${this._config.port}`);
|
||||
await server.listen(this._config.port);
|
||||
}
|
||||
|
||||
async _setup() {
|
||||
this._app = await UtapiServer._createApp(apiSpec);
|
||||
this._server = await UtapiServer._createServer(this._app);
|
||||
this._server = await this._createServer(this._app);
|
||||
}
|
||||
|
||||
async _start() {
|
||||
await cacheClient.connect();
|
||||
await UtapiServer._startServer(this._server);
|
||||
await this._startServer(this._server);
|
||||
}
|
||||
|
||||
async _join() {
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
const { EventEmitter } = require('events');
|
||||
|
||||
class SubSystem extends EventEmitter {
|
||||
constructor(config) {
|
||||
super();
|
||||
this._config = config;
|
||||
}
|
||||
|
||||
async setup() {
|
||||
await this._setup(this._config);
|
||||
}
|
||||
|
||||
async start() {
|
||||
await this._start();
|
||||
}
|
||||
|
||||
async join() {
|
||||
this.emit('exit');
|
||||
await this._join();
|
||||
}
|
||||
|
||||
/* eslint-disable class-methods-use-this,no-empty-function */
|
||||
async _setup() {}
|
||||
|
||||
async _start() {}
|
||||
|
||||
async _join() {}
|
||||
/* eslint-enable class-methods-use-this,no-empty-function */
|
||||
}
|
||||
|
||||
module.exports = SubSystem;
|
|
@ -2,9 +2,11 @@ const assert = require('assert');
|
|||
const cron = require('node-schedule');
|
||||
const cronparser = require('cron-parser');
|
||||
|
||||
const { client: cacheClient } = require('../cache');
|
||||
const Process = require('../process');
|
||||
const { client: cacheClient, buildCacheClient } = require('../cache');
|
||||
const SubSystem = require('../subsystem');
|
||||
|
||||
const { LoggerContext, iterIfError } = require('../utils');
|
||||
const { buildWarp10Clients } = require('../warp10');
|
||||
|
||||
const logger = new LoggerContext({
|
||||
module: 'BaseTask',
|
||||
|
@ -12,49 +14,51 @@ const logger = new LoggerContext({
|
|||
|
||||
class Now {}
|
||||
|
||||
class BaseTask extends Process {
|
||||
constructor(options) {
|
||||
super();
|
||||
assert.notStrictEqual(options, undefined);
|
||||
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
|
||||
this._cache = cacheClient;
|
||||
this._warp10Clients = options.warp10;
|
||||
class BaseTask extends SubSystem {
|
||||
constructor(config) {
|
||||
super(config);
|
||||
// TODO construct cache client here rather than globally
|
||||
this._cache = null;
|
||||
this._warp10Clients = null;
|
||||
this._scheduler = null;
|
||||
this._defaultSchedule = Now;
|
||||
this._defaultLag = 0;
|
||||
}
|
||||
|
||||
async _setup(includeDefaultOpts = true) {
|
||||
if (includeDefaultOpts) {
|
||||
this._program
|
||||
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
|
||||
.option(
|
||||
'-s, --schedule <crontab>',
|
||||
'Execute task using this crontab. Overrides configured schedule',
|
||||
value => {
|
||||
cronparser.parseExpression(value);
|
||||
return value;
|
||||
},
|
||||
)
|
||||
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
||||
.option('-n, --node-id <id>', 'Set a custom node id');
|
||||
}
|
||||
async _setup(config) {
|
||||
this._nodeId = config.nodeId;
|
||||
this._cache = buildCacheClient(config.cache);
|
||||
this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
|
||||
// if (includeDefaultOpts) {
|
||||
// this._program
|
||||
// .option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
|
||||
// .option(
|
||||
// '-s, --schedule <crontab>',
|
||||
// 'Execute task using this crontab. Overrides configured schedule',
|
||||
// value => {
|
||||
// cronparser.parseExpression(value);
|
||||
// return value;
|
||||
// },
|
||||
// )
|
||||
// .option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
|
||||
// .option('-n, --node-id <id>', 'Set a custom node id');
|
||||
// }
|
||||
}
|
||||
|
||||
get schedule() {
|
||||
if (this._program.now) {
|
||||
return Now;
|
||||
}
|
||||
if (this._program.schedule) {
|
||||
return this._program.schedule;
|
||||
}
|
||||
// if (this._program.now) {
|
||||
// return Now;
|
||||
// }
|
||||
// if (this._program.schedule) {
|
||||
// return this._program.schedule;
|
||||
// }
|
||||
return this._defaultSchedule;
|
||||
}
|
||||
|
||||
get lag() {
|
||||
if (this._program.lag !== undefined) {
|
||||
return this._program.lag;
|
||||
}
|
||||
// if (this._program.lag !== undefined) {
|
||||
// return this._program.lag;
|
||||
// }
|
||||
return this._defaultLag;
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ const assert = require('assert');
|
|||
const async = require('async');
|
||||
const BaseTask = require('./BaseTask');
|
||||
const { UtapiMetric } = require('../models');
|
||||
const config = require('../config');
|
||||
// const config = require('../config');
|
||||
const { checkpointLagSecs } = require('../constants');
|
||||
const {
|
||||
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
|
||||
|
@ -15,11 +15,15 @@ const logger = new LoggerContext({
|
|||
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
|
||||
|
||||
class IngestShardTask extends BaseTask {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
constructor(config, stripEventUUID = true) {
|
||||
super(config);
|
||||
this._stripEventUUID = stripEventUUID;
|
||||
}
|
||||
|
||||
async _setup(config) {
|
||||
await super._setup(config);
|
||||
this._defaultSchedule = config.ingestionSchedule;
|
||||
this._defaultLag = config.ingestionLagSeconds;
|
||||
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
|
||||
}
|
||||
|
||||
_hydrateEvent(data, stripTimestamp = false) {
|
||||
|
@ -77,13 +81,13 @@ class IngestShardTask extends BaseTask {
|
|||
return warp10.ingest(
|
||||
{
|
||||
className: metricClass,
|
||||
labels: { origin: config.nodeId },
|
||||
labels: { origin: this._nodeId },
|
||||
}, records,
|
||||
);
|
||||
});
|
||||
assert.strictEqual(status, records.length);
|
||||
await this._cache.deleteShard(shard);
|
||||
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
|
||||
logger.info(`ingested ${status} records from ${this._nodeId} into ${ingestedIntoNodeId}`);
|
||||
} else {
|
||||
logger.debug('No events found in shard, cleaning up');
|
||||
}
|
||||
|
|
|
@ -119,15 +119,22 @@ class Warp10Client {
|
|||
}
|
||||
}
|
||||
|
||||
const clients = _config.warp10.hosts.map(
|
||||
val => new Warp10Client({
|
||||
readToken: _config.warp10.readToken,
|
||||
writeToken: _config.warp10.writeToken,
|
||||
...val,
|
||||
}),
|
||||
);
|
||||
function buildWarp10Clients(hosts) {
|
||||
return hosts.map(
|
||||
val => new Warp10Client({
|
||||
readToken: _config.warp10.readToken,
|
||||
writeToken: _config.warp10.writeToken,
|
||||
...val,
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
// TODO Remove after all users have been moved to building their own clients
|
||||
const clients = buildWarp10Clients(_config.warp10.hosts);
|
||||
|
||||
module.exports = {
|
||||
Warp10Client,
|
||||
clients,
|
||||
buildWarp10Clients,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue