Compare commits

...

1 Commits

Author SHA1 Message Date
Taylor McKinnon 6628962b6b stash 2021-03-29 10:37:07 -07:00
11 changed files with 311 additions and 86 deletions

25
bin/utapi.js Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env node
const Process = require('../libV2/process');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({ module: 'entrypoint' });
const utapiProcess = new Process();
utapiProcess.setup().then(
() => {
logger.info('Utapi setup completed, starting...');
return utapiProcess.start();
},
async error => {
logger.error(`Utapi encountered an unexpected error during setup ${ error.message }`, );
await utapiProcess.join(1);
},
).then(
() => logger.info('Utapi started'),
async error => {
logger.error(`Utapi encountered an error during startup: ${error.message}`);
await utapiProcess.join(1);
},
);

View File

@ -12,9 +12,12 @@ function orNull(value) {
return value === undefined ? null : value; return value === undefined ? null : value;
} }
const _multiprocessDB = {};
class MemoryCache { class MemoryCache {
constructor() { constructor(config) {
this._data = {}; this._data = config && config.multiprocess ? _multiprocessDB : {};
this._shards = {}; this._shards = {};
this._prefix = 'utapi'; this._prefix = 'utapi';
this._expirations = {}; this._expirations = {};

11
libV2/cache/index.js vendored
View File

@ -8,6 +8,15 @@ const cacheTypes = {
memory: () => new MemoryCache(), memory: () => new MemoryCache(),
}; };
function buildCacheClient(cacheConfig) {
const { backend, counter, cache } = cacheConfig;
return new CacheClient({
cacheBackend: cacheTypes[backend](cache),
counterBackend: cacheTypes[backend](counter),
});
}
// TODO remove after all users have been moved to buildCacheClient
const cacheBackend = cacheTypes[config.cache.backend](config.cache); const cacheBackend = cacheTypes[config.cache.backend](config.cache);
const counterBackend = cacheTypes[config.cache.backend](config.redis); const counterBackend = cacheTypes[config.cache.backend](config.redis);
@ -17,5 +26,7 @@ module.exports = {
MemoryCache, MemoryCache,
RedisCache, RedisCache,
}, },
// TODO remove after all users have been moved to buildCacheClient
client: new CacheClient({ cacheBackend, counterBackend }), client: new CacheClient({ cacheBackend, counterBackend }),
buildCacheClient,
}; };

64
libV2/cli.js Normal file
View File

@ -0,0 +1,64 @@
const { Command } = require('commander');
const _config = require('./config');
const { comprehend } = require('./utils');
const { logger } = require('./utils');
const availableSubsystems = [
'server',
'ingest',
'checkpoint',
'snapshot',
'repair',
'reindex',
'limit',
];
const enabledSubsystems = [];
const cli = new Command()
.option('--server', 'Start a Utapi metrics server', () =>
enabledSubsystems.push('server') && true)
.option('--ingest', 'Start the ingest task', () =>
enabledSubsystems.push('ingest') && true)
.option(
'--checkpoint',
'Start the checkpoint task scheduler',
() => enabledSubsystems.push('checkpoint') && true,
)
.option('--snapshot', 'Start the snapshot task scheduler', () =>
enabledSubsystems.push('snapshot') && true)
.option('--repair', 'Start the repair task scheduler', () =>
enabledSubsystems.push('repair') && true)
.option('--reindex', 'Start the reindex task scheduler', () =>
enabledSubsystems.push('reindex') && true)
.option(
'--now',
'Ignore configured schedules and execute specified background tasks immediately, exiting afterward.\n'
+ 'Can not be used with --server.',
);
class UtapiCLI {
static _parseSubsystems(config) {
return availableSubsystems.filter(sys => config[sys]);
// return comprehend(subsystems, (_, key) => ({
// key,
// value: !!config[key],
// }));
}
static parse(argv) {
const parsed = cli.parse(argv);
if (parsed.now && enabledSubsystems.includes('server')) {
throw new Error('--now can not be used with --server');
}
console.log(parsed)
const cliConfig = {
subsystems: UtapiCLI._parseSubsystems(parsed),
};
return _config.merge(cliConfig);
}
}
module.exports = UtapiCLI;

View File

@ -350,7 +350,7 @@ class Config {
* @param {object} newConfig - an object using the same structure as the config file * @param {object} newConfig - an object using the same structure as the config file
* @returns {Config} - New Config instance * @returns {Config} - New Config instance
*/ */
static merge(newConfig) { merge(newConfig) {
return new Config(newConfig); return new Config(newConfig);
} }
} }

View File

@ -1,16 +1,39 @@
const { EventEmitter } = require('events'); const { EventEmitter } = require('events');
const os = require('os'); const os = require('os');
const { Command } = require('commander'); const async = require('async');
const { logger } = require('./utils'); const { logger, comprehend } = require('./utils');
const UtapiCLI = require('./cli');
const { UtapiServer } = require('./server');
const {
IngestShard,
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
MonitorDiskUsage,
} = require('./tasks');
const subsystems = {
server: UtapiServer,
ingest: IngestShard,
checkpoint: CreateCheckpoint,
snapshot: CreateSnapshot,
repair: RepairTask,
reindex: ReindexTask,
limit: MonitorDiskUsage,
// TODO split expiration into separate task
// expiration:
};
class Process extends EventEmitter { class Process extends EventEmitter {
constructor(...options) { constructor() {
super(...options); super();
this._program = null; this._config = null;
this._subsystems = null;
} }
async setup() { _registerSignalHandlers() {
const cleanUpFunc = this.join.bind(this); const cleanUpFunc = this.join.bind(this, 1);
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => { ['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
process.on(eventName, cleanUpFunc); process.on(eventName, cleanUpFunc);
}); });
@ -19,27 +42,78 @@ class Process extends EventEmitter {
{ error, stack: error.stack.split(os.EOL) }); { error, stack: error.stack.split(os.EOL) });
cleanUpFunc(); cleanUpFunc();
}); });
this._program = new Command(); }
await this._setup();
async setup() {
this._registerSignalHandlers();
try {
this._config = UtapiCLI.parse(process.argv);
} catch(error) {
console.log(error.message);
return false;
}
// console.log(this._config)
this._subsystems = await Process._setupSubSystems(this._config);
return true;
}
static async _setupSubSystems(config) {
return async.reduce(config.subsystems, {},
// const systems = comprehend(
// config.subsystems,
async (systems, key) => {
const sys = new subsystems[key](config);
await sys.setup();
systems[key] = sys;
return systems;
},
);
} }
async start() { async start() {
this._program.parse(process.argv); if (!this._subsystems) {
await this._start(); throw new Error('The process must be setup before starting!');
}
// console.log(this._subsystems)
await Promise.all(
Object.entries(this._subsystems).map(async ([name, sys]) => {
try {
await sys.start();
} catch (error) {
const msg = `Error starting subsystem ${name}`;
logger.error(msg, { error });
throw new Error(msg);
}
}),
);
} }
async join() { async join(returnCode = 0) {
this.emit('exit'); this.emit('exit');
await this._join(); console.log('-'.repeat(50))
if (this._subsystems) {
const results = await Promise.all(
Object.entries(this._subsystems).map(async ([name, sys]) => {
try {
await sys.join();
} catch (error) {
logger.error(`Error stopping subsystem ${name}`, { error });
return name;
}
return null;
}),
);
const errors = results.filter(e => e !== null);
if (errors.length) {
logger.error(`Error stopping subsystems: ${errors.join(', ')}`, { subsystems: errors });
process.exit(1);
}
} }
/* eslint-disable class-methods-use-this,no-empty-function */ process.exit(returnCode);
async _setup() {} }
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
} }
module.exports = Process; module.exports = Process;

View File

@ -4,8 +4,9 @@ const express = require('express');
const bodyParser = require('body-parser'); const bodyParser = require('body-parser');
const { ciphers, dhparam } = require('arsenal').https; const { ciphers, dhparam } = require('arsenal').https;
const Process = require('../process'); const SubSystem = require('../subsystem');
const config = require('../config');
// const config = require('../config');
const { initializeOasTools, middleware } = require('./middleware'); const { initializeOasTools, middleware } = require('./middleware');
const { spec: apiSpec } = require('./spec'); const { spec: apiSpec } = require('./spec');
const { client: cacheClient } = require('../cache'); const { client: cacheClient } = require('../cache');
@ -15,11 +16,12 @@ const moduleLogger = new LoggerContext({
module: 'server', module: 'server',
}); });
class UtapiServer extends Process { class UtapiServer extends SubSystem {
constructor() { constructor(config) {
super(); super();
this._app = null; this._app = null;
this._server = null; this._server = null;
this._config = config;
} }
static async _createApp(spec) { static async _createApp(spec) {
@ -32,13 +34,13 @@ class UtapiServer extends Process {
return app; return app;
} }
static _createHttpsAgent() { _createHttpsAgent() {
const conf = { const conf = {
ciphers: ciphers.ciphers, ciphers: ciphers.ciphers,
dhparam, dhparam,
cert: config.tls.cert, cert: this._config.tls.cert,
key: config.tls.key, key: this._config.tls.key,
ca: config.tls.ca ? [config.tls.ca] : null, ca: this._config.tls.ca ? [this._config.tls.ca] : null,
requestCert: false, requestCert: false,
rejectUnauthorized: true, rejectUnauthorized: true,
}; };
@ -47,31 +49,31 @@ class UtapiServer extends Process {
return conf; return conf;
} }
static async _createServer(app) { async _createServer(app) {
if (config.tls) { if (this._config.tls) {
return https.createServer(UtapiServer._createHttpsAgent(), app); return https.createServer(UtapiServer._createHttpsAgent(), app);
} }
return http.createServer(app); return http.createServer(app);
} }
static async _startServer(server) { async _startServer(server) {
moduleLogger moduleLogger
.with({ .with({
method: 'UtapiServer::_startServer', method: 'UtapiServer::_startServer',
cacheBackend: config.cacheBackend, cacheBackend: this._config.cacheBackend,
}) })
.info(`Server listening on ${config.port}`); .info(`Server listening on ${this._config.port}`);
await server.listen(config.port); await server.listen(this._config.port);
} }
async _setup() { async _setup() {
this._app = await UtapiServer._createApp(apiSpec); this._app = await UtapiServer._createApp(apiSpec);
this._server = await UtapiServer._createServer(this._app); this._server = await this._createServer(this._app);
} }
async _start() { async _start() {
await cacheClient.connect(); await cacheClient.connect();
await UtapiServer._startServer(this._server); await this._startServer(this._server);
} }
async _join() { async _join() {

31
libV2/subsystem.js Normal file
View File

@ -0,0 +1,31 @@
const { EventEmitter } = require('events');
class SubSystem extends EventEmitter {
constructor(config) {
super();
this._config = config;
}
async setup() {
await this._setup(this._config);
}
async start() {
await this._start();
}
async join() {
this.emit('exit');
await this._join();
}
/* eslint-disable class-methods-use-this,no-empty-function */
async _setup() {}
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
}
module.exports = SubSystem;

View File

@ -2,9 +2,11 @@ const assert = require('assert');
const cron = require('node-schedule'); const cron = require('node-schedule');
const cronparser = require('cron-parser'); const cronparser = require('cron-parser');
const { client: cacheClient } = require('../cache'); const { client: cacheClient, buildCacheClient } = require('../cache');
const Process = require('../process'); const SubSystem = require('../subsystem');
const { LoggerContext, iterIfError } = require('../utils'); const { LoggerContext, iterIfError } = require('../utils');
const { buildWarp10Clients } = require('../warp10');
const logger = new LoggerContext({ const logger = new LoggerContext({
module: 'BaseTask', module: 'BaseTask',
@ -12,49 +14,51 @@ const logger = new LoggerContext({
class Now {} class Now {}
class BaseTask extends Process { class BaseTask extends SubSystem {
constructor(options) { constructor(config) {
super(); super(config);
assert.notStrictEqual(options, undefined); // TODO construct cache client here rather than globally
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients'); this._cache = null;
this._cache = cacheClient; this._warp10Clients = null;
this._warp10Clients = options.warp10;
this._scheduler = null; this._scheduler = null;
this._defaultSchedule = Now; this._defaultSchedule = Now;
this._defaultLag = 0; this._defaultLag = 0;
} }
async _setup(includeDefaultOpts = true) { async _setup(config) {
if (includeDefaultOpts) { this._nodeId = config.nodeId;
this._program this._cache = buildCacheClient(config.cache);
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.') this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
.option( // if (includeDefaultOpts) {
'-s, --schedule <crontab>', // this._program
'Execute task using this crontab. Overrides configured schedule', // .option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
value => { // .option(
cronparser.parseExpression(value); // '-s, --schedule <crontab>',
return value; // 'Execute task using this crontab. Overrides configured schedule',
}, // value => {
) // cronparser.parseExpression(value);
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10)) // return value;
.option('-n, --node-id <id>', 'Set a custom node id'); // },
} // )
// .option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
// .option('-n, --node-id <id>', 'Set a custom node id');
// }
} }
get schedule() { get schedule() {
if (this._program.now) { // if (this._program.now) {
return Now; // return Now;
} // }
if (this._program.schedule) { // if (this._program.schedule) {
return this._program.schedule; // return this._program.schedule;
} // }
return this._defaultSchedule; return this._defaultSchedule;
} }
get lag() { get lag() {
if (this._program.lag !== undefined) { // if (this._program.lag !== undefined) {
return this._program.lag; // return this._program.lag;
} // }
return this._defaultLag; return this._defaultLag;
} }

View File

@ -2,7 +2,7 @@ const assert = require('assert');
const async = require('async'); const async = require('async');
const BaseTask = require('./BaseTask'); const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models'); const { UtapiMetric } = require('../models');
const config = require('../config'); // const config = require('../config');
const { checkpointLagSecs } = require('../constants'); const { checkpointLagSecs } = require('../constants');
const { const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now, LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
@ -15,11 +15,15 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs); const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
class IngestShardTask extends BaseTask { class IngestShardTask extends BaseTask {
constructor(options) { constructor(config, stripEventUUID = true) {
super(options); super(config);
this._stripEventUUID = stripEventUUID;
}
async _setup(config) {
await super._setup(config);
this._defaultSchedule = config.ingestionSchedule; this._defaultSchedule = config.ingestionSchedule;
this._defaultLag = config.ingestionLagSeconds; this._defaultLag = config.ingestionLagSeconds;
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
} }
_hydrateEvent(data, stripTimestamp = false) { _hydrateEvent(data, stripTimestamp = false) {
@ -77,13 +81,13 @@ class IngestShardTask extends BaseTask {
return warp10.ingest( return warp10.ingest(
{ {
className: metricClass, className: metricClass,
labels: { origin: config.nodeId }, labels: { origin: this._nodeId },
}, records, }, records,
); );
}); });
assert.strictEqual(status, records.length); assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard); await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`); logger.info(`ingested ${status} records from ${this._nodeId} into ${ingestedIntoNodeId}`);
} else { } else {
logger.debug('No events found in shard, cleaning up'); logger.debug('No events found in shard, cleaning up');
} }

View File

@ -119,15 +119,22 @@ class Warp10Client {
} }
} }
const clients = _config.warp10.hosts.map( function buildWarp10Clients(hosts) {
return hosts.map(
val => new Warp10Client({ val => new Warp10Client({
readToken: _config.warp10.readToken, readToken: _config.warp10.readToken,
writeToken: _config.warp10.writeToken, writeToken: _config.warp10.writeToken,
...val, ...val,
}), }),
); );
}
// TODO Remove after all users have been moved to building their own clients
const clients = buildWarp10Clients(_config.warp10.hosts);
module.exports = { module.exports = {
Warp10Client, Warp10Client,
clients, clients,
buildWarp10Clients,
}; };