Compare commits

...

1 Commits

Author SHA1 Message Date
Taylor McKinnon 6628962b6b stash 2021-03-29 10:37:07 -07:00
11 changed files with 311 additions and 86 deletions

25
bin/utapi.js Executable file
View File

@ -0,0 +1,25 @@
#!/usr/bin/env node
const Process = require('../libV2/process');
const { LoggerContext } = require('../libV2/utils');
const logger = new LoggerContext({ module: 'entrypoint' });
const utapiProcess = new Process();
utapiProcess.setup().then(
() => {
logger.info('Utapi setup completed, starting...');
return utapiProcess.start();
},
async error => {
logger.error(`Utapi encountered an unexpected error during setup ${ error.message }`, );
await utapiProcess.join(1);
},
).then(
() => logger.info('Utapi started'),
async error => {
logger.error(`Utapi encountered an error during startup: ${error.message}`);
await utapiProcess.join(1);
},
);

View File

@ -12,9 +12,12 @@ function orNull(value) {
return value === undefined ? null : value;
}
const _multiprocessDB = {};
class MemoryCache {
constructor() {
this._data = {};
constructor(config) {
this._data = config && config.multiprocess ? _multiprocessDB : {};
this._shards = {};
this._prefix = 'utapi';
this._expirations = {};

11
libV2/cache/index.js vendored
View File

@ -8,6 +8,15 @@ const cacheTypes = {
memory: () => new MemoryCache(),
};
function buildCacheClient(cacheConfig) {
const { backend, counter, cache } = cacheConfig;
return new CacheClient({
cacheBackend: cacheTypes[backend](cache),
counterBackend: cacheTypes[backend](counter),
});
}
// TODO remove after all users have been moved to buildCacheClient
const cacheBackend = cacheTypes[config.cache.backend](config.cache);
const counterBackend = cacheTypes[config.cache.backend](config.redis);
@ -17,5 +26,7 @@ module.exports = {
MemoryCache,
RedisCache,
},
// TODO remove after all users have been moved to buildCacheClient
client: new CacheClient({ cacheBackend, counterBackend }),
buildCacheClient,
};

64
libV2/cli.js Normal file
View File

@ -0,0 +1,64 @@
const { Command } = require('commander');
const _config = require('./config');
const { comprehend } = require('./utils');
const { logger } = require('./utils');
const availableSubsystems = [
'server',
'ingest',
'checkpoint',
'snapshot',
'repair',
'reindex',
'limit',
];
const enabledSubsystems = [];
const cli = new Command()
.option('--server', 'Start a Utapi metrics server', () =>
enabledSubsystems.push('server') && true)
.option('--ingest', 'Start the ingest task', () =>
enabledSubsystems.push('ingest') && true)
.option(
'--checkpoint',
'Start the checkpoint task scheduler',
() => enabledSubsystems.push('checkpoint') && true,
)
.option('--snapshot', 'Start the snapshot task scheduler', () =>
enabledSubsystems.push('snapshot') && true)
.option('--repair', 'Start the repair task scheduler', () =>
enabledSubsystems.push('repair') && true)
.option('--reindex', 'Start the reindex task scheduler', () =>
enabledSubsystems.push('reindex') && true)
.option(
'--now',
'Ignore configured schedules and execute specified background tasks immediately, exiting afterward.\n'
+ 'Can not be used with --server.',
);
class UtapiCLI {
static _parseSubsystems(config) {
return availableSubsystems.filter(sys => config[sys]);
// return comprehend(subsystems, (_, key) => ({
// key,
// value: !!config[key],
// }));
}
static parse(argv) {
const parsed = cli.parse(argv);
if (parsed.now && enabledSubsystems.includes('server')) {
throw new Error('--now can not be used with --server');
}
console.log(parsed)
const cliConfig = {
subsystems: UtapiCLI._parseSubsystems(parsed),
};
return _config.merge(cliConfig);
}
}
module.exports = UtapiCLI;

View File

@ -350,7 +350,7 @@ class Config {
* @param {object} newConfig - an object using the same structure as the config file
* @returns {Config} - New Config instance
*/
static merge(newConfig) {
merge(newConfig) {
return new Config(newConfig);
}
}

View File

@ -1,16 +1,39 @@
const { EventEmitter } = require('events');
const os = require('os');
const { Command } = require('commander');
const { logger } = require('./utils');
const async = require('async');
const { logger, comprehend } = require('./utils');
const UtapiCLI = require('./cli');
const { UtapiServer } = require('./server');
const {
IngestShard,
CreateCheckpoint,
CreateSnapshot,
RepairTask,
ReindexTask,
MonitorDiskUsage,
} = require('./tasks');
const subsystems = {
server: UtapiServer,
ingest: IngestShard,
checkpoint: CreateCheckpoint,
snapshot: CreateSnapshot,
repair: RepairTask,
reindex: ReindexTask,
limit: MonitorDiskUsage,
// TODO split expiration into separate task
// expiration:
};
class Process extends EventEmitter {
constructor(...options) {
super(...options);
this._program = null;
constructor() {
super();
this._config = null;
this._subsystems = null;
}
async setup() {
const cleanUpFunc = this.join.bind(this);
_registerSignalHandlers() {
const cleanUpFunc = this.join.bind(this, 1);
['SIGINT', 'SIGQUIT', 'SIGTERM'].forEach(eventName => {
process.on(eventName, cleanUpFunc);
});
@ -19,27 +42,78 @@ class Process extends EventEmitter {
{ error, stack: error.stack.split(os.EOL) });
cleanUpFunc();
});
this._program = new Command();
await this._setup();
}
async setup() {
this._registerSignalHandlers();
try {
this._config = UtapiCLI.parse(process.argv);
} catch(error) {
console.log(error.message);
return false;
}
// console.log(this._config)
this._subsystems = await Process._setupSubSystems(this._config);
return true;
}
static async _setupSubSystems(config) {
return async.reduce(config.subsystems, {},
// const systems = comprehend(
// config.subsystems,
async (systems, key) => {
const sys = new subsystems[key](config);
await sys.setup();
systems[key] = sys;
return systems;
},
);
}
async start() {
this._program.parse(process.argv);
await this._start();
if (!this._subsystems) {
throw new Error('The process must be setup before starting!');
}
// console.log(this._subsystems)
await Promise.all(
Object.entries(this._subsystems).map(async ([name, sys]) => {
try {
await sys.start();
} catch (error) {
const msg = `Error starting subsystem ${name}`;
logger.error(msg, { error });
throw new Error(msg);
}
}),
);
}
async join() {
async join(returnCode = 0) {
this.emit('exit');
await this._join();
console.log('-'.repeat(50))
if (this._subsystems) {
const results = await Promise.all(
Object.entries(this._subsystems).map(async ([name, sys]) => {
try {
await sys.join();
} catch (error) {
logger.error(`Error stopping subsystem ${name}`, { error });
return name;
}
return null;
}),
);
const errors = results.filter(e => e !== null);
if (errors.length) {
logger.error(`Error stopping subsystems: ${errors.join(', ')}`, { subsystems: errors });
process.exit(1);
}
}
process.exit(returnCode);
}
/* eslint-disable class-methods-use-this,no-empty-function */
async _setup() {}
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
}
module.exports = Process;

View File

@ -4,8 +4,9 @@ const express = require('express');
const bodyParser = require('body-parser');
const { ciphers, dhparam } = require('arsenal').https;
const Process = require('../process');
const config = require('../config');
const SubSystem = require('../subsystem');
// const config = require('../config');
const { initializeOasTools, middleware } = require('./middleware');
const { spec: apiSpec } = require('./spec');
const { client: cacheClient } = require('../cache');
@ -15,11 +16,12 @@ const moduleLogger = new LoggerContext({
module: 'server',
});
class UtapiServer extends Process {
constructor() {
class UtapiServer extends SubSystem {
constructor(config) {
super();
this._app = null;
this._server = null;
this._config = config;
}
static async _createApp(spec) {
@ -32,13 +34,13 @@ class UtapiServer extends Process {
return app;
}
static _createHttpsAgent() {
_createHttpsAgent() {
const conf = {
ciphers: ciphers.ciphers,
dhparam,
cert: config.tls.cert,
key: config.tls.key,
ca: config.tls.ca ? [config.tls.ca] : null,
cert: this._config.tls.cert,
key: this._config.tls.key,
ca: this._config.tls.ca ? [this._config.tls.ca] : null,
requestCert: false,
rejectUnauthorized: true,
};
@ -47,31 +49,31 @@ class UtapiServer extends Process {
return conf;
}
static async _createServer(app) {
if (config.tls) {
async _createServer(app) {
if (this._config.tls) {
return https.createServer(UtapiServer._createHttpsAgent(), app);
}
return http.createServer(app);
}
static async _startServer(server) {
async _startServer(server) {
moduleLogger
.with({
method: 'UtapiServer::_startServer',
cacheBackend: config.cacheBackend,
cacheBackend: this._config.cacheBackend,
})
.info(`Server listening on ${config.port}`);
await server.listen(config.port);
.info(`Server listening on ${this._config.port}`);
await server.listen(this._config.port);
}
async _setup() {
this._app = await UtapiServer._createApp(apiSpec);
this._server = await UtapiServer._createServer(this._app);
this._server = await this._createServer(this._app);
}
async _start() {
await cacheClient.connect();
await UtapiServer._startServer(this._server);
await this._startServer(this._server);
}
async _join() {

31
libV2/subsystem.js Normal file
View File

@ -0,0 +1,31 @@
const { EventEmitter } = require('events');
class SubSystem extends EventEmitter {
constructor(config) {
super();
this._config = config;
}
async setup() {
await this._setup(this._config);
}
async start() {
await this._start();
}
async join() {
this.emit('exit');
await this._join();
}
/* eslint-disable class-methods-use-this,no-empty-function */
async _setup() {}
async _start() {}
async _join() {}
/* eslint-enable class-methods-use-this,no-empty-function */
}
module.exports = SubSystem;

View File

@ -2,9 +2,11 @@ const assert = require('assert');
const cron = require('node-schedule');
const cronparser = require('cron-parser');
const { client: cacheClient } = require('../cache');
const Process = require('../process');
const { client: cacheClient, buildCacheClient } = require('../cache');
const SubSystem = require('../subsystem');
const { LoggerContext, iterIfError } = require('../utils');
const { buildWarp10Clients } = require('../warp10');
const logger = new LoggerContext({
module: 'BaseTask',
@ -12,49 +14,51 @@ const logger = new LoggerContext({
class Now {}
class BaseTask extends Process {
constructor(options) {
super();
assert.notStrictEqual(options, undefined);
assert(Array.isArray(options.warp10), 'you must provide an array of warp 10 clients');
this._cache = cacheClient;
this._warp10Clients = options.warp10;
class BaseTask extends SubSystem {
constructor(config) {
super(config);
// TODO construct cache client here rather than globally
this._cache = null;
this._warp10Clients = null;
this._scheduler = null;
this._defaultSchedule = Now;
this._defaultLag = 0;
}
async _setup(includeDefaultOpts = true) {
if (includeDefaultOpts) {
this._program
.option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
.option(
'-s, --schedule <crontab>',
'Execute task using this crontab. Overrides configured schedule',
value => {
cronparser.parseExpression(value);
return value;
},
)
.option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
.option('-n, --node-id <id>', 'Set a custom node id');
}
async _setup(config) {
this._nodeId = config.nodeId;
this._cache = buildCacheClient(config.cache);
this._warp10Clients = buildWarp10Clients(config.warp10.hosts);
// if (includeDefaultOpts) {
// this._program
// .option('-n, --now', 'Execute the task immediately and then exit. Overrides --schedule.')
// .option(
// '-s, --schedule <crontab>',
// 'Execute task using this crontab. Overrides configured schedule',
// value => {
// cronparser.parseExpression(value);
// return value;
// },
// )
// .option('-l, --lag <lag>', 'Set a custom lag time in seconds', v => parseInt(v, 10))
// .option('-n, --node-id <id>', 'Set a custom node id');
// }
}
get schedule() {
if (this._program.now) {
return Now;
}
if (this._program.schedule) {
return this._program.schedule;
}
// if (this._program.now) {
// return Now;
// }
// if (this._program.schedule) {
// return this._program.schedule;
// }
return this._defaultSchedule;
}
get lag() {
if (this._program.lag !== undefined) {
return this._program.lag;
}
// if (this._program.lag !== undefined) {
// return this._program.lag;
// }
return this._defaultLag;
}

View File

@ -2,7 +2,7 @@ const assert = require('assert');
const async = require('async');
const BaseTask = require('./BaseTask');
const { UtapiMetric } = require('../models');
const config = require('../config');
// const config = require('../config');
const { checkpointLagSecs } = require('../constants');
const {
LoggerContext, shardFromTimestamp, convertTimestamp, InterpolatedClock, now,
@ -15,11 +15,15 @@ const logger = new LoggerContext({
const checkpointLagMicroseconds = convertTimestamp(checkpointLagSecs);
class IngestShardTask extends BaseTask {
constructor(options) {
super(options);
constructor(config, stripEventUUID = true) {
super(config);
this._stripEventUUID = stripEventUUID;
}
async _setup(config) {
await super._setup(config);
this._defaultSchedule = config.ingestionSchedule;
this._defaultLag = config.ingestionLagSeconds;
this._stripEventUUID = options.stripEventUUID !== undefined ? options.stripEventUUID : true;
}
_hydrateEvent(data, stripTimestamp = false) {
@ -77,13 +81,13 @@ class IngestShardTask extends BaseTask {
return warp10.ingest(
{
className: metricClass,
labels: { origin: config.nodeId },
labels: { origin: this._nodeId },
}, records,
);
});
assert.strictEqual(status, records.length);
await this._cache.deleteShard(shard);
logger.info(`ingested ${status} records from ${config.nodeId} into ${ingestedIntoNodeId}`);
logger.info(`ingested ${status} records from ${this._nodeId} into ${ingestedIntoNodeId}`);
} else {
logger.debug('No events found in shard, cleaning up');
}

View File

@ -119,15 +119,22 @@ class Warp10Client {
}
}
const clients = _config.warp10.hosts.map(
val => new Warp10Client({
readToken: _config.warp10.readToken,
writeToken: _config.warp10.writeToken,
...val,
}),
);
function buildWarp10Clients(hosts) {
return hosts.map(
val => new Warp10Client({
readToken: _config.warp10.readToken,
writeToken: _config.warp10.writeToken,
...val,
}),
);
}
// TODO Remove after all users have been moved to building their own clients
const clients = buildWarp10Clients(_config.warp10.hosts);
module.exports = {
Warp10Client,
clients,
buildWarp10Clients,
};