Compare commits
10 Commits
developmen
...
feature/ZE
Author | SHA1 | Date |
---|---|---|
jeremyds | 2a54568b48 | |
jeremyds | 259f29d28d | |
jeremyds | 5d11557b24 | |
jeremyds | 0feafbd19b | |
jeremyds | d786cbd63a | |
jeremyds | 5a2f8e98bd | |
jeremyds | ea6b208ebe | |
jeremyds | d23235c8df | |
jeremyds | 932884a088 | |
jeremyds | d18020fa84 |
|
@ -100,7 +100,7 @@ class ChannelMessageV0 {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a wire representation of a metrics message
|
||||
* Creates a wire representation of a metrics report message
|
||||
*
|
||||
* @param {object} body Metrics report
|
||||
*
|
||||
|
@ -116,6 +116,40 @@ class ChannelMessageV0 {
|
|||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a wire representation of a configuration overlay message
|
||||
*
|
||||
* @param {object} body configuration overlay
|
||||
*
|
||||
* @returns {Buffer} wire representation
|
||||
*/
|
||||
static encodeConfigOverlayMessage(body) {
|
||||
const overlay = JSON.stringify(body);
|
||||
const buf = Buffer.alloc(overlay.length + headerSize);
|
||||
buf.writeUInt8(MessageType.CONFIG_OVERLAY_MESSAGE, 0);
|
||||
buf.writeUInt8(0, 1);
|
||||
buf.writeUInt8(TargetType.TARGET_ANY, 2);
|
||||
buf.write(overlay, headerSize);
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a wire representation of a metric request message
|
||||
*
|
||||
* @param {object} body Metrics request
|
||||
*
|
||||
* @returns {Buffer} wire representation
|
||||
*/
|
||||
static encodeMetricsRequestMessage(body) {
|
||||
const request = JSON.stringify(body);
|
||||
const buf = Buffer.alloc(request.length + headerSize);
|
||||
buf.writeUInt8(MessageType.METRICS_REQUEST_MESSAGE, 0);
|
||||
buf.writeUInt8(0, 1);
|
||||
buf.writeUInt8(TargetType.TARGET_ANY, 2);
|
||||
buf.write(request, headerSize);
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol name used for subprotocol negociation
|
||||
*/
|
||||
|
|
|
@ -82,13 +82,7 @@ function initManagementClient() {
|
|||
});
|
||||
}
|
||||
|
||||
function isManagementAgentUsed() {
|
||||
return process.env.MANAGEMENT_USE_AGENT === '1';
|
||||
}
|
||||
|
||||
|
||||
module.exports = {
|
||||
managementAgentMessageType,
|
||||
initManagementClient,
|
||||
isManagementAgentUsed,
|
||||
};
|
||||
|
|
|
@ -318,33 +318,10 @@ function loadCachedOverlay(log, callback) {
|
|||
});
|
||||
}
|
||||
|
||||
function applyAndSaveOverlay(overlay, log) {
|
||||
patchConfiguration(overlay, log, err => {
|
||||
if (err) {
|
||||
log.error('could not apply pushed overlay', {
|
||||
error: reshapeExceptionError(err),
|
||||
method: 'applyAndSaveOverlay',
|
||||
});
|
||||
return;
|
||||
}
|
||||
saveConfigurationVersion(null, overlay, log, err => {
|
||||
if (err) {
|
||||
log.error('could not cache overlay version', {
|
||||
error: reshapeExceptionError(err),
|
||||
method: 'applyAndSaveOverlay',
|
||||
});
|
||||
return;
|
||||
}
|
||||
log.info('overlay push processed');
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
loadCachedOverlay,
|
||||
managementDatabaseName,
|
||||
patchConfiguration,
|
||||
saveConfigurationVersion,
|
||||
remoteOverlayIsNewer,
|
||||
applyAndSaveOverlay,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/* From RFC6455 which defines some reserved status code ranges, the range
|
||||
* 4000-4999 are for private use. */
|
||||
const WS_STATUS_IDLE = {
|
||||
code: 4000,
|
||||
reason: 'does not reply to ping before timeout',
|
||||
};
|
||||
|
||||
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS =
|
||||
process.env.MANAGEMENT_CHECK_CLIENT_FQCY_MS || 15000;
|
||||
|
||||
const endpointPath = 'api/v1/instance';
|
||||
|
||||
const managementEndpointRoot =
|
||||
process.env.MANAGEMENT_ENDPOINT ||
|
||||
'https://api.zenko.io';
|
||||
const managementEndpoint = `${managementEndpointRoot}/${endpointPath}`;
|
||||
|
||||
const pushEndpointRoot =
|
||||
process.env.PUSH_ENDPOINT ||
|
||||
'https://push.api.zenko.io';
|
||||
const pushEndpoint = `${pushEndpointRoot}/${endpointPath}`;
|
||||
|
||||
|
||||
module.exports = {
|
||||
WS_STATUS_IDLE,
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||
managementEndpoint,
|
||||
pushEndpoint,
|
||||
};
|
|
@ -3,6 +3,7 @@ const forge = require('node-forge');
|
|||
const request = require('request');
|
||||
|
||||
const metadata = require('../metadata/wrapper');
|
||||
const { managementEndpoint } = require('./constants');
|
||||
|
||||
const managementDatabaseName = 'PENSIEVE';
|
||||
const tokenConfigurationKey = 'auth/zenko/remote-management-token';
|
||||
|
@ -25,7 +26,7 @@ function getStoredCredentials(log, callback) {
|
|||
log, callback);
|
||||
}
|
||||
|
||||
function issueCredentials(managementEndpoint, instanceId, log, callback) {
|
||||
function issueCredentials(instanceId, log, callback) {
|
||||
log.info('registering with API to get token');
|
||||
|
||||
const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 });
|
||||
|
@ -54,8 +55,7 @@ function issueCredentials(managementEndpoint, instanceId, log, callback) {
|
|||
}).json(postData);
|
||||
}
|
||||
|
||||
function confirmInstanceCredentials(
|
||||
managementEndpoint, instanceId, creds, log, callback) {
|
||||
function confirmInstanceCredentials(instanceId, creds, log, callback) {
|
||||
const opts = {
|
||||
headers: {
|
||||
'x-instance-authentication-token': creds.token,
|
||||
|
@ -84,7 +84,6 @@ function confirmInstanceCredentials(
|
|||
* is registered as new against the Orbit API with newly-generated
|
||||
* RSA key pair.
|
||||
*
|
||||
* @param {string} managementEndpoint API endpoint
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
||||
* initialization process
|
||||
|
@ -92,12 +91,11 @@ function confirmInstanceCredentials(
|
|||
*
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function initManagementCredentials(
|
||||
managementEndpoint, instanceId, log, callback) {
|
||||
function initManagementCredentials(instanceId, log, callback) {
|
||||
getStoredCredentials(log, (error, value) => {
|
||||
if (error) {
|
||||
if (error.NoSuchKey) {
|
||||
return issueCredentials(managementEndpoint, instanceId, log,
|
||||
return issueCredentials(instanceId, log,
|
||||
(error, value) => {
|
||||
if (error) {
|
||||
log.error('could not issue token',
|
||||
|
@ -118,8 +116,7 @@ function initManagementCredentials(
|
|||
log.info('saved token locally, ' +
|
||||
'confirming instance');
|
||||
return confirmInstanceCredentials(
|
||||
managementEndpoint, instanceId, value, log,
|
||||
callback);
|
||||
instanceId, value, log, callback);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
const arsenal = require('arsenal');
|
||||
const errors = arsenal.errors;
|
||||
const async = require('async');
|
||||
|
||||
const metadata = require('../metadata/wrapper');
|
||||
|
@ -13,19 +14,9 @@ const { initManagementCredentials } = require('./credentials');
|
|||
const { startWSManagementClient } = require('./push');
|
||||
const { startPollingManagementClient } = require('./poll');
|
||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||
const { isManagementAgentUsed } = require('./agentClient');
|
||||
|
||||
const initRemoteManagementRetryDelay = 10000;
|
||||
|
||||
const managementEndpointRoot =
|
||||
process.env.MANAGEMENT_ENDPOINT ||
|
||||
'https://api.zenko.io';
|
||||
const managementEndpoint = `${managementEndpointRoot}/api/v1/instance`;
|
||||
|
||||
const pushEndpointRoot =
|
||||
process.env.PUSH_ENDPOINT ||
|
||||
'https://push.api.zenko.io';
|
||||
const pushEndpoint = `${pushEndpointRoot}/api/v1/instance`;
|
||||
|
||||
function initManagementDatabase(log, callback) {
|
||||
// XXX choose proper owner names
|
||||
|
@ -48,12 +39,12 @@ function initManagementDatabase(log, callback) {
|
|||
});
|
||||
}
|
||||
|
||||
function startManagementListeners(instanceId, token) {
|
||||
function startManagementListeners(instanceId, newOverlayCallback, token) {
|
||||
const mode = process.env.MANAGEMENT_MODE || 'push';
|
||||
if (mode === 'push') {
|
||||
startWSManagementClient(pushEndpoint, instanceId, token);
|
||||
startWSManagementClient(instanceId, token, newOverlayCallback);
|
||||
} else {
|
||||
startPollingManagementClient(managementEndpoint, instanceId, token);
|
||||
startPollingManagementClient(instanceId, token, newOverlayCallback);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,14 +57,15 @@ function startManagementListeners(instanceId, token) {
|
|||
* - loading and applying the latest cached overlay configuration
|
||||
* - starting a configuration update and metrics push background task
|
||||
*
|
||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
||||
* initialization process
|
||||
* @param {function} callback Function to call once the overlay is loaded
|
||||
* (overlay)
|
||||
*
|
||||
* @param {werelogs~Logger} log Request-scoped logger to be able to
|
||||
* trace initialization process
|
||||
* @param {function} newOverlayCallback Function to call once a new overlay
|
||||
* is received (overlay)
|
||||
* @param {function} callback Function to call once the overlay
|
||||
* is loaded (overlay)
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function initManagement(log, callback) {
|
||||
function initManagement(log, newOverlayCallback, callback) {
|
||||
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
|
||||
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|
||||
|| process.env.S3BACKEND === 'mem') {
|
||||
|
@ -81,34 +73,28 @@ function initManagement(log, callback) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* Temporary check before to fully move to the process management agent. */
|
||||
if (isManagementAgentUsed() ^ typeof callback === 'function') {
|
||||
let msg = 'misuse of initManagement function: ';
|
||||
msg += `MANAGEMENT_USE_AGENT: ${process.env.MANAGEMENT_USE_AGENT}`;
|
||||
msg += `, callback type: ${typeof callback}`;
|
||||
throw new Error(msg);
|
||||
}
|
||||
|
||||
async.waterfall([
|
||||
// eslint-disable-next-line arrow-body-style
|
||||
cb => { return isManagementAgentUsed() ? metadata.setup(cb) : cb(); },
|
||||
cb => metadata.setup(cb),
|
||||
cb => initManagementDatabase(log, cb),
|
||||
cb => metadata.getUUID(log, cb),
|
||||
(instanceId, cb) => initManagementCredentials(
|
||||
managementEndpoint, instanceId, log, cb),
|
||||
(instanceId, token, cb) => {
|
||||
if (!isManagementAgentUsed()) {
|
||||
cb(null, instanceId, token, {});
|
||||
return;
|
||||
(instanceId, cb) => {
|
||||
const initialID = process.env.INITIAL_INSTANCE_ID;
|
||||
if (initialID && initialID !== instanceId) {
|
||||
log.error('INITIAL_INSTANCE_ID value is different from ' +
|
||||
'instance id saved in metadata', {
|
||||
envVarValue: initialID,
|
||||
metadataValue: instanceId,
|
||||
});
|
||||
return callback(errors.InvalidParameterValue);
|
||||
}
|
||||
return cb(null, instanceId);
|
||||
},
|
||||
(instanceId, cb) => initManagementCredentials(instanceId, log, cb),
|
||||
(instanceId, token, cb) => {
|
||||
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
|
||||
token, overlay));
|
||||
},
|
||||
(instanceId, token, overlay, cb) => {
|
||||
if (!isManagementAgentUsed()) {
|
||||
cb(null, instanceId, token, overlay);
|
||||
return;
|
||||
}
|
||||
patchConfiguration(overlay, log,
|
||||
err => cb(err, instanceId, token, overlay));
|
||||
},
|
||||
|
@ -119,13 +105,16 @@ function initManagement(log, callback) {
|
|||
method: 'initManagement' });
|
||||
setTimeout(initManagement,
|
||||
initRemoteManagementRetryDelay,
|
||||
logger.newRequestLogger());
|
||||
logger.newRequestLogger(),
|
||||
newOverlayCallback,
|
||||
callback
|
||||
);
|
||||
} else {
|
||||
log.info(`this deployment's Instance ID is ${instanceId}`);
|
||||
log.end('management init done');
|
||||
startManagementListeners(instanceId, token);
|
||||
startManagementListeners(instanceId, newOverlayCallback, token);
|
||||
if (callback) {
|
||||
callback(overlay);
|
||||
callback(null, overlay);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -7,16 +7,17 @@ const logger = require('../utilities/logger');
|
|||
const metadata = require('../metadata/wrapper');
|
||||
const {
|
||||
loadCachedOverlay,
|
||||
patchConfiguration,
|
||||
saveConfigurationVersion,
|
||||
} = require('./configuration');
|
||||
const { managementEndpoint } = require('./constants');
|
||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||
|
||||
const pushReportDelay = 30000;
|
||||
const pullConfigurationOverlayDelay = 60000;
|
||||
|
||||
function loadRemoteOverlay(
|
||||
managementEndpoint, instanceId, remoteToken, cachedOverlay, log, cb) {
|
||||
let overlayMessageListener = null;
|
||||
let cachedOverlay = null;
|
||||
|
||||
function loadRemoteOverlay(instanceId, remoteToken, log, cb) {
|
||||
log.debug('loading remote overlay');
|
||||
const opts = {
|
||||
headers: {
|
||||
|
@ -30,29 +31,34 @@ function loadRemoteOverlay(
|
|||
return cb(error);
|
||||
}
|
||||
if (response.statusCode === 200) {
|
||||
return cb(null, cachedOverlay, body);
|
||||
return cb(null, body);
|
||||
}
|
||||
if (response.statusCode === 404) {
|
||||
return cb(null, cachedOverlay, {});
|
||||
return cb(null, {});
|
||||
}
|
||||
return cb(arsenal.errors.AccessForbidden, cachedOverlay, {});
|
||||
return cb(arsenal.errors.AccessForbidden, {});
|
||||
}).json();
|
||||
}
|
||||
|
||||
// TODO save only after successful patch
|
||||
function applyConfigurationOverlay(
|
||||
managementEndpoint, instanceId, remoteToken, log) {
|
||||
function applyConfigurationOverlay(instanceId, remoteToken, log) {
|
||||
async.waterfall([
|
||||
wcb => loadCachedOverlay(log, wcb),
|
||||
(cachedOverlay, wcb) => patchConfiguration(cachedOverlay,
|
||||
log, wcb),
|
||||
(cachedOverlay, wcb) =>
|
||||
loadRemoteOverlay(managementEndpoint, instanceId, remoteToken,
|
||||
cachedOverlay, log, wcb),
|
||||
(cachedOverlay, remoteOverlay, wcb) =>
|
||||
saveConfigurationVersion(cachedOverlay, remoteOverlay, log, wcb),
|
||||
(remoteOverlay, wcb) => patchConfiguration(remoteOverlay,
|
||||
log, wcb),
|
||||
wcb => {
|
||||
if (cachedOverlay) {
|
||||
wcb(null, cachedOverlay);
|
||||
} else {
|
||||
loadCachedOverlay(log, wcb);
|
||||
}
|
||||
},
|
||||
overlay => { cachedOverlay = overlay; },
|
||||
wcb => loadRemoteOverlay(instanceId, remoteToken,
|
||||
log, wcb),
|
||||
remoteOverlay => {
|
||||
cachedOverlay = remoteOverlay;
|
||||
if (overlayMessageListener) {
|
||||
overlayMessageListener(JSON.stringify(cachedOverlay));
|
||||
}
|
||||
},
|
||||
], error => {
|
||||
if (error) {
|
||||
log.error('could not apply managed configuration',
|
||||
|
@ -60,12 +66,11 @@ function applyConfigurationOverlay(
|
|||
method: 'applyConfigurationOverlay' });
|
||||
}
|
||||
setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay,
|
||||
managementEndpoint, instanceId, remoteToken,
|
||||
logger.newRequestLogger());
|
||||
instanceId, remoteToken, logger.newRequestLogger());
|
||||
});
|
||||
}
|
||||
|
||||
function postStats(managementEndpoint, instanceId, remoteToken, next) {
|
||||
function postStats(instanceId, remoteToken, next) {
|
||||
const toURL = `${managementEndpoint}/${instanceId}/stats`;
|
||||
const toOptions = {
|
||||
headers: {
|
||||
|
@ -99,14 +104,19 @@ function getStats() {
|
|||
return request(fromURL, fromOptions).json();
|
||||
}
|
||||
|
||||
function pushStats(managementEndpoint, instanceId, remoteToken, next) {
|
||||
function pushStats(instanceId, remoteToken, next) {
|
||||
if (process.env.PUSH_STATS === 'false') {
|
||||
return;
|
||||
}
|
||||
getStats().pipe(
|
||||
postStats(managementEndpoint, instanceId, remoteToken, next));
|
||||
setTimeout(pushStats, pushReportDelay,
|
||||
managementEndpoint, instanceId, remoteToken);
|
||||
getStats()
|
||||
.on('error', err => {
|
||||
/* If the management process launches too quick, the cloud server may
|
||||
* not be listening yet. */
|
||||
logger.info('failed to get stats', { err });
|
||||
})
|
||||
.pipe(
|
||||
postStats(instanceId, remoteToken, next));
|
||||
setTimeout(pushStats, pushReportDelay, instanceId, remoteToken);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,20 +125,23 @@ function pushStats(managementEndpoint, instanceId, remoteToken, next) {
|
|||
* Periodically polls for configuration updates, and pushes stats at
|
||||
* a fixed interval.
|
||||
*
|
||||
* @param {string} managementEndpoint API endpoint
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {string} remoteToken API authentication token
|
||||
* @param {function} newOverlayCallback Function to call once a new overlay is
|
||||
* loaded (overlay)
|
||||
*
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function startPollingManagementClient(
|
||||
managementEndpoint, instanceId, remoteToken) {
|
||||
instanceId, remoteToken, newOverlayCallback) {
|
||||
overlayMessageListener = newOverlayCallback;
|
||||
|
||||
metadata.notifyBucketChange(() => {
|
||||
pushStats(managementEndpoint, instanceId, remoteToken);
|
||||
pushStats(instanceId, remoteToken);
|
||||
});
|
||||
|
||||
pushStats(managementEndpoint, instanceId, remoteToken);
|
||||
applyConfigurationOverlay(managementEndpoint, instanceId, remoteToken,
|
||||
pushStats(instanceId, remoteToken);
|
||||
applyConfigurationOverlay(instanceId, remoteToken,
|
||||
logger.newRequestLogger());
|
||||
}
|
||||
|
||||
|
|
|
@ -4,19 +4,16 @@ const net = require('net');
|
|||
const request = require('request');
|
||||
const { URL } = require('url');
|
||||
const WebSocket = require('ws');
|
||||
const assert = require('assert');
|
||||
|
||||
const _config = require('../Config').config;
|
||||
const logger = require('../utilities/logger');
|
||||
const metadata = require('../metadata/wrapper');
|
||||
|
||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||
const { isManagementAgentUsed } = require('./agentClient');
|
||||
const { applyAndSaveOverlay } = require('./configuration');
|
||||
const {
|
||||
ChannelMessageV0,
|
||||
MessageType,
|
||||
} = require('./ChannelMessageV0');
|
||||
const { pushEndpoint } = require('./constants');
|
||||
|
||||
const {
|
||||
CONFIG_OVERLAY_MESSAGE,
|
||||
|
@ -65,19 +62,23 @@ function createWSAgent(pushEndpoint, env, log) {
|
|||
return null;
|
||||
}
|
||||
|
||||
function pushEndpointUrlFromInstanceId(instanceId) {
|
||||
return `${pushEndpoint}/${instanceId}/ws`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts background task that updates configuration and pushes stats.
|
||||
*
|
||||
* Receives pushed Websocket messages on configuration updates, and
|
||||
* sends stat messages in response to API sollicitations.
|
||||
*
|
||||
* @param {string} pushEndpoint API endpoint
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {string} token API authentication token
|
||||
*
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {string} token API authentication token
|
||||
* @param {function} newOverlayCallback Function to call once a new overlay is
|
||||
* loaded (overlay)
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||
function startWSManagementClient(instanceId, token, newOverlayCallback) {
|
||||
logger.info('connecting to push server');
|
||||
function _logError(error, errorMessage, method) {
|
||||
if (error) {
|
||||
|
@ -86,14 +87,18 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
}
|
||||
}
|
||||
|
||||
overlayMessageListener = newOverlayCallback;
|
||||
|
||||
const socketsByChannelId = [];
|
||||
const headers = {
|
||||
'x-instance-authentication-token': token,
|
||||
};
|
||||
const agent = createWSAgent(pushEndpoint, process.env, logger);
|
||||
|
||||
const url = `${pushEndpoint}/${instanceId}/ws`;
|
||||
const url = pushEndpointUrlFromInstanceId(instanceId);
|
||||
|
||||
const ws = new WebSocket(url, subprotocols, { headers, agent });
|
||||
|
||||
let pingTimeout = null;
|
||||
|
||||
function sendPing() {
|
||||
|
@ -112,7 +117,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
if (process.env.PUSH_STATS === 'false') {
|
||||
return;
|
||||
}
|
||||
const fromURL = `http://localhost:${_config.port}/_/report`;
|
||||
const fromURL = process.env.STAT_REPORT_URL ||
|
||||
`http://localhost:${_config.port}/_/report`;
|
||||
const fromOptions = {
|
||||
headers: {
|
||||
'x-scal-report-token': process.env.REPORT_TOKEN,
|
||||
|
@ -121,7 +127,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
};
|
||||
request(fromURL, fromOptions, (err, response, body) => {
|
||||
if (err) {
|
||||
_logError(err, 'failed to get metrics report', 'pushStats');
|
||||
const what = 'failed to get metrics report';
|
||||
const msg =
|
||||
`${what} at ${fromURL} with headers ${fromOptions}`;
|
||||
_logError(err, msg, 'pushStats');
|
||||
return;
|
||||
}
|
||||
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
|
||||
|
@ -204,8 +213,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
metadata.notifyBucketChange(null);
|
||||
_config.removeListener('browser-access-enabled-change',
|
||||
browserAccessChangeHandler);
|
||||
setTimeout(startWSManagementClient, 10000, pushEndpoint,
|
||||
instanceId, token);
|
||||
const timeout = process.env.ORBIT_CONNECTION_TIMEOUT_MS || 10000;
|
||||
|
||||
setTimeout(startWSManagementClient, timeout, instanceId, token,
|
||||
newOverlayCallback);
|
||||
});
|
||||
|
||||
ws.on('error', err => {
|
||||
|
@ -224,17 +235,12 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
});
|
||||
|
||||
ws.on('message', data => {
|
||||
const log = logger.newRequestLogger();
|
||||
const message = new ChannelMessageV0(data);
|
||||
|
||||
switch (message.getType()) {
|
||||
case CONFIG_OVERLAY_MESSAGE:
|
||||
if (!isManagementAgentUsed()) {
|
||||
applyAndSaveOverlay(JSON.parse(message.getPayload()), log);
|
||||
} else {
|
||||
if (overlayMessageListener) {
|
||||
overlayMessageListener(message.getPayload().toString());
|
||||
}
|
||||
if (overlayMessageListener) {
|
||||
overlayMessageListener(message.getPayload().toString());
|
||||
}
|
||||
break;
|
||||
case METRICS_REQUEST_MESSAGE:
|
||||
|
@ -256,13 +262,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
});
|
||||
}
|
||||
|
||||
function addOverlayMessageListener(callback) {
|
||||
assert(typeof callback === 'function');
|
||||
overlayMessageListener = callback;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
createWSAgent,
|
||||
startWSManagementClient,
|
||||
addOverlayMessageListener,
|
||||
pushEndpointUrlFromInstanceId,
|
||||
};
|
||||
|
|
|
@ -14,11 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
|
|||
const api = require('./api/api');
|
||||
const data = require('./data/wrapper');
|
||||
const metadata = require('./metadata/wrapper');
|
||||
const { initManagement } = require('./management');
|
||||
const {
|
||||
initManagementClient,
|
||||
isManagementAgentUsed,
|
||||
} = require('./management/agentClient');
|
||||
const { initManagementClient } = require('./management/agentClient');
|
||||
|
||||
const routes = arsenal.s3routes.routes;
|
||||
const websiteEndpoints = _config.websiteEndpoints;
|
||||
|
@ -40,7 +36,6 @@ const STATS_INTERVAL = 5; // 5 seconds
|
|||
const STATS_EXPIRY = 30; // 30 seconds
|
||||
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
|
||||
STATS_EXPIRY);
|
||||
const enableRemoteManagement = true;
|
||||
|
||||
class S3Server {
|
||||
/**
|
||||
|
@ -153,15 +148,7 @@ class S3Server {
|
|||
|
||||
// TODO this should wait for metadata healthcheck to be ok
|
||||
// TODO only do this in cluster master
|
||||
if (enableRemoteManagement) {
|
||||
if (!isManagementAgentUsed()) {
|
||||
setTimeout(() => {
|
||||
initManagement(logger.newRequestLogger());
|
||||
}, 5000);
|
||||
} else {
|
||||
initManagementClient();
|
||||
}
|
||||
}
|
||||
initManagementClient();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -5,15 +5,16 @@ const logger = require('./lib/utilities/logger');
|
|||
const { initManagement } = require('./lib/management');
|
||||
const _config = require('./lib/Config').config;
|
||||
const { managementAgentMessageType } = require('./lib/management/agentClient');
|
||||
const { addOverlayMessageListener } = require('./lib/management/push');
|
||||
const { saveConfigurationVersion } = require('./lib/management/configuration');
|
||||
const {
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||
WS_STATUS_IDLE,
|
||||
} = require('./lib/management/constants');
|
||||
|
||||
|
||||
// TODO: auth?
|
||||
// TODO: werelogs with a specific name.
|
||||
|
||||
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
|
||||
|
||||
|
||||
class ManagementAgentServer {
|
||||
constructor() {
|
||||
|
@ -29,25 +30,21 @@ class ManagementAgentServer {
|
|||
process.on('SIGPIPE', () => {});
|
||||
}
|
||||
|
||||
start(_cb) {
|
||||
const cb = _cb || function noop() {};
|
||||
|
||||
start() {
|
||||
/* Define REPORT_TOKEN env variable needed by the management
|
||||
* module. */
|
||||
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|
||||
|| _config.reportToken
|
||||
|| Uuid.v4();
|
||||
|
||||
initManagement(logger.newRequestLogger(), overlay => {
|
||||
let error = null;
|
||||
|
||||
if (overlay) {
|
||||
this.loadedOverlay = overlay;
|
||||
this.startServer();
|
||||
} else {
|
||||
error = new Error('failed to init management');
|
||||
/* The initManegement function retries when it fails. */
|
||||
const log = logger.newRequestLogger();
|
||||
initManagement(log, this.onNewOverlay.bind(this), (err, overlay) => {
|
||||
if (err) {
|
||||
process.exit(0);
|
||||
}
|
||||
return cb(error);
|
||||
this.loadedOverlay = overlay;
|
||||
this.startServer();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -75,8 +72,6 @@ class ManagementAgentServer {
|
|||
|
||||
setInterval(this.checkBrokenConnections.bind(this),
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||
|
||||
addOverlayMessageListener(this.onNewOverlay.bind(this));
|
||||
}
|
||||
|
||||
onConnection(socket, request) {
|
||||
|
@ -154,6 +149,14 @@ class ManagementAgentServer {
|
|||
return;
|
||||
}
|
||||
this.loadedOverlay = remoteOverlayObj;
|
||||
if (this.loadedOverlay.browserAccess) {
|
||||
if (Boolean(_config.browserAccessEnabled) !==
|
||||
Boolean(this.loadedOverlay.browserAccess.enabled)) {
|
||||
_config.browserAccessEnabled =
|
||||
Boolean(this.loadedOverlay.browserAccess.enabled);
|
||||
_config.emit('browser-access-enabled-change');
|
||||
}
|
||||
}
|
||||
this.wss.clients.forEach(
|
||||
this._sendNewOverlayToClient.bind(this)
|
||||
);
|
||||
|
@ -166,7 +169,7 @@ class ManagementAgentServer {
|
|||
logger.info('close broken connection', {
|
||||
client: client._socket._peername,
|
||||
});
|
||||
client.terminate();
|
||||
client.close(WS_STATUS_IDLE.code, WS_STATUS_IDLE.reason);
|
||||
return;
|
||||
}
|
||||
client.isAlive = false;
|
||||
|
|
|
@ -89,12 +89,13 @@
|
|||
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||
"mem_backend": "S3BACKEND=mem node index.js",
|
||||
"perf": "mocha tests/performance/s3standard.js",
|
||||
"start": "npm-run-all --parallel start_dmd start_s3server",
|
||||
"start": "npm-run-all --parallel start_dmd start_management start_s3server",
|
||||
"start_mongo": "npm run cloudserver",
|
||||
"start_mdserver": "node mdserver.js",
|
||||
"start_dataserver": "node dataserver.js",
|
||||
"start_s3server": "node index.js",
|
||||
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
||||
"start_management": "node managementAgent.js",
|
||||
"start_utapi": "node lib/utapi/utapi.js",
|
||||
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
||||
"management_agent": "node managementAgent.js",
|
||||
|
|
Loading…
Reference in New Issue