Compare commits

...

10 Commits

Author SHA1 Message Date
jeremyds 2a54568b48 ft: ZENKO-716: management process, handle browserAccess change
The overlay sent by orbit to Zenko contains a "browserAccess" field.
When this parameter is set to false, payload message sent by Orbit
should not be forwarded. With the code move from the cloudserver to the
new management process, this value should be handled and the appropriate
configuration event should be emitted on value change.
2018-10-23 13:46:01 -07:00
jeremyds 259f29d28d ft: ZENKO-716: fix infinite loop in initManagement.
The initManagement function has an automatic retry when it fails. The
problem is that when an INITIAL_INSTANCE_ID environment variable is
passed and there is already an instance ID saved in metadata and these
two values are different, the initManagement will retry infinitely.

This commit catches this situation and make the initManagement function
stop to retry and instead it raised an error to upper level. This error
is now caught in upper level and leads to a process exit.
2018-10-23 13:46:01 -07:00
jeremyds 5d11557b24 ft: ZENKO-716: fix setTimeout 2018-10-23 13:46:01 -07:00
jeremyds 0feafbd19b ft: ZENKO-716: endpoint value 2018-10-23 13:46:01 -07:00
jeremyds d786cbd63a ft: ZENKO-716: new management process poll mode
This commit fixes the management pull mode of the new management
process. We don't need anymore to save and patch the overlay into
lib/management/poll.

Furthermore, like for the push mode, a callback should be fired when a
new overlay is received, to notify the upper layer. The initManagement
takes this new callback as parameter and the previous listener
registering in lib/management/push is no longer required.
2018-10-23 13:45:54 -07:00
jeremyds 5a2f8e98bd ft: ZENKO-716: rename env variablle for coherence. 2018-10-23 13:45:54 -07:00
jeremyds ea6b208ebe ft: ZENKO-716: management, use constant and env variables for tests 2018-10-23 13:45:54 -07:00
jeremyds d23235c8df ft: ZENKO-716: add management message encoding helper function
Testing the management process feature will require to send this kind of
message.
2018-10-23 13:45:54 -07:00
jeremyds 932884a088 ft: ZENKO-716: mninor error log improvement
Add the url and the headers in the error log used when the stat push
request from management to orbit fails.
2018-10-23 13:45:54 -07:00
jeremyds d18020fa84 ft: ZENKO-716: stop using management code in S3 2018-10-23 13:45:53 -07:00
11 changed files with 199 additions and 174 deletions

View File

@ -100,7 +100,7 @@ class ChannelMessageV0 {
} }
/** /**
* Creates a wire representation of a metrics message * Creates a wire representation of a metrics report message
* *
* @param {object} body Metrics report * @param {object} body Metrics report
* *
@ -116,6 +116,40 @@ class ChannelMessageV0 {
return buf; return buf;
} }
/**
* Creates a wire representation of a configuration overlay message
*
* @param {object} body configuration overlay
*
* @returns {Buffer} wire representation
*/
static encodeConfigOverlayMessage(body) {
const overlay = JSON.stringify(body);
const buf = Buffer.alloc(overlay.length + headerSize);
buf.writeUInt8(MessageType.CONFIG_OVERLAY_MESSAGE, 0);
buf.writeUInt8(0, 1);
buf.writeUInt8(TargetType.TARGET_ANY, 2);
buf.write(overlay, headerSize);
return buf;
}
/**
* Creates a wire representation of a metric request message
*
* @param {object} body Metrics request
*
* @returns {Buffer} wire representation
*/
static encodeMetricsRequestMessage(body) {
const request = JSON.stringify(body);
const buf = Buffer.alloc(request.length + headerSize);
buf.writeUInt8(MessageType.METRICS_REQUEST_MESSAGE, 0);
buf.writeUInt8(0, 1);
buf.writeUInt8(TargetType.TARGET_ANY, 2);
buf.write(request, headerSize);
return buf;
}
/** /**
* Protocol name used for subprotocol negociation * Protocol name used for subprotocol negociation
*/ */

View File

@ -82,13 +82,7 @@ function initManagementClient() {
}); });
} }
function isManagementAgentUsed() {
return process.env.MANAGEMENT_USE_AGENT === '1';
}
module.exports = { module.exports = {
managementAgentMessageType, managementAgentMessageType,
initManagementClient, initManagementClient,
isManagementAgentUsed,
}; };

View File

@ -318,33 +318,10 @@ function loadCachedOverlay(log, callback) {
}); });
} }
function applyAndSaveOverlay(overlay, log) {
patchConfiguration(overlay, log, err => {
if (err) {
log.error('could not apply pushed overlay', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
saveConfigurationVersion(null, overlay, log, err => {
if (err) {
log.error('could not cache overlay version', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
log.info('overlay push processed');
});
});
}
module.exports = { module.exports = {
loadCachedOverlay, loadCachedOverlay,
managementDatabaseName, managementDatabaseName,
patchConfiguration, patchConfiguration,
saveConfigurationVersion, saveConfigurationVersion,
remoteOverlayIsNewer, remoteOverlayIsNewer,
applyAndSaveOverlay,
}; };

View File

@ -0,0 +1,29 @@
/* From RFC6455 which defines some reserved status code ranges, the range
* 4000-4999 are for private use. */
const WS_STATUS_IDLE = {
code: 4000,
reason: 'does not reply to ping before timeout',
};
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS =
process.env.MANAGEMENT_CHECK_CLIENT_FQCY_MS || 15000;
const endpointPath = 'api/v1/instance';
const managementEndpointRoot =
process.env.MANAGEMENT_ENDPOINT ||
'https://api.zenko.io';
const managementEndpoint = `${managementEndpointRoot}/${endpointPath}`;
const pushEndpointRoot =
process.env.PUSH_ENDPOINT ||
'https://push.api.zenko.io';
const pushEndpoint = `${pushEndpointRoot}/${endpointPath}`;
module.exports = {
WS_STATUS_IDLE,
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
managementEndpoint,
pushEndpoint,
};

View File

@ -3,6 +3,7 @@ const forge = require('node-forge');
const request = require('request'); const request = require('request');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const { managementEndpoint } = require('./constants');
const managementDatabaseName = 'PENSIEVE'; const managementDatabaseName = 'PENSIEVE';
const tokenConfigurationKey = 'auth/zenko/remote-management-token'; const tokenConfigurationKey = 'auth/zenko/remote-management-token';
@ -25,7 +26,7 @@ function getStoredCredentials(log, callback) {
log, callback); log, callback);
} }
function issueCredentials(managementEndpoint, instanceId, log, callback) { function issueCredentials(instanceId, log, callback) {
log.info('registering with API to get token'); log.info('registering with API to get token');
const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 }); const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 });
@ -54,8 +55,7 @@ function issueCredentials(managementEndpoint, instanceId, log, callback) {
}).json(postData); }).json(postData);
} }
function confirmInstanceCredentials( function confirmInstanceCredentials(instanceId, creds, log, callback) {
managementEndpoint, instanceId, creds, log, callback) {
const opts = { const opts = {
headers: { headers: {
'x-instance-authentication-token': creds.token, 'x-instance-authentication-token': creds.token,
@ -84,7 +84,6 @@ function confirmInstanceCredentials(
* is registered as new against the Orbit API with newly-generated * is registered as new against the Orbit API with newly-generated
* RSA key pair. * RSA key pair.
* *
* @param {string} managementEndpoint API endpoint
* @param {string} instanceId UUID of this deployment * @param {string} instanceId UUID of this deployment
* @param {werelogs~Logger} log Request-scoped logger to be able to trace * @param {werelogs~Logger} log Request-scoped logger to be able to trace
* initialization process * initialization process
@ -92,12 +91,11 @@ function confirmInstanceCredentials(
* *
* @returns {undefined} * @returns {undefined}
*/ */
function initManagementCredentials( function initManagementCredentials(instanceId, log, callback) {
managementEndpoint, instanceId, log, callback) {
getStoredCredentials(log, (error, value) => { getStoredCredentials(log, (error, value) => {
if (error) { if (error) {
if (error.NoSuchKey) { if (error.NoSuchKey) {
return issueCredentials(managementEndpoint, instanceId, log, return issueCredentials(instanceId, log,
(error, value) => { (error, value) => {
if (error) { if (error) {
log.error('could not issue token', log.error('could not issue token',
@ -118,8 +116,7 @@ function initManagementCredentials(
log.info('saved token locally, ' + log.info('saved token locally, ' +
'confirming instance'); 'confirming instance');
return confirmInstanceCredentials( return confirmInstanceCredentials(
managementEndpoint, instanceId, value, log, instanceId, value, log, callback);
callback);
}); });
}); });
} }

View File

@ -1,4 +1,5 @@
const arsenal = require('arsenal'); const arsenal = require('arsenal');
const errors = arsenal.errors;
const async = require('async'); const async = require('async');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
@ -13,19 +14,9 @@ const { initManagementCredentials } = require('./credentials');
const { startWSManagementClient } = require('./push'); const { startWSManagementClient } = require('./push');
const { startPollingManagementClient } = require('./poll'); const { startPollingManagementClient } = require('./poll');
const { reshapeExceptionError } = arsenal.errorUtils; const { reshapeExceptionError } = arsenal.errorUtils;
const { isManagementAgentUsed } = require('./agentClient');
const initRemoteManagementRetryDelay = 10000; const initRemoteManagementRetryDelay = 10000;
const managementEndpointRoot =
process.env.MANAGEMENT_ENDPOINT ||
'https://api.zenko.io';
const managementEndpoint = `${managementEndpointRoot}/api/v1/instance`;
const pushEndpointRoot =
process.env.PUSH_ENDPOINT ||
'https://push.api.zenko.io';
const pushEndpoint = `${pushEndpointRoot}/api/v1/instance`;
function initManagementDatabase(log, callback) { function initManagementDatabase(log, callback) {
// XXX choose proper owner names // XXX choose proper owner names
@ -48,12 +39,12 @@ function initManagementDatabase(log, callback) {
}); });
} }
function startManagementListeners(instanceId, token) { function startManagementListeners(instanceId, newOverlayCallback, token) {
const mode = process.env.MANAGEMENT_MODE || 'push'; const mode = process.env.MANAGEMENT_MODE || 'push';
if (mode === 'push') { if (mode === 'push') {
startWSManagementClient(pushEndpoint, instanceId, token); startWSManagementClient(instanceId, token, newOverlayCallback);
} else { } else {
startPollingManagementClient(managementEndpoint, instanceId, token); startPollingManagementClient(instanceId, token, newOverlayCallback);
} }
} }
@ -66,14 +57,15 @@ function startManagementListeners(instanceId, token) {
* - loading and applying the latest cached overlay configuration * - loading and applying the latest cached overlay configuration
* - starting a configuration update and metrics push background task * - starting a configuration update and metrics push background task
* *
* @param {werelogs~Logger} log Request-scoped logger to be able to trace * @param {werelogs~Logger} log Request-scoped logger to be able to
* initialization process * trace initialization process
* @param {function} callback Function to call once the overlay is loaded * @param {function} newOverlayCallback Function to call once a new overlay
* (overlay) * is received (overlay)
* * @param {function} callback Function to call once the overlay
* is loaded (overlay)
* @returns {undefined} * @returns {undefined}
*/ */
function initManagement(log, callback) { function initManagement(log, newOverlayCallback, callback) {
if ((process.env.REMOTE_MANAGEMENT_DISABLE && if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
process.env.REMOTE_MANAGEMENT_DISABLE !== '0') process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|| process.env.S3BACKEND === 'mem') { || process.env.S3BACKEND === 'mem') {
@ -81,34 +73,28 @@ function initManagement(log, callback) {
return; return;
} }
/* Temporary check before to fully move to the process management agent. */
if (isManagementAgentUsed() ^ typeof callback === 'function') {
let msg = 'misuse of initManagement function: ';
msg += `MANAGEMENT_USE_AGENT: ${process.env.MANAGEMENT_USE_AGENT}`;
msg += `, callback type: ${typeof callback}`;
throw new Error(msg);
}
async.waterfall([ async.waterfall([
// eslint-disable-next-line arrow-body-style cb => metadata.setup(cb),
cb => { return isManagementAgentUsed() ? metadata.setup(cb) : cb(); },
cb => initManagementDatabase(log, cb), cb => initManagementDatabase(log, cb),
cb => metadata.getUUID(log, cb), cb => metadata.getUUID(log, cb),
(instanceId, cb) => initManagementCredentials( (instanceId, cb) => {
managementEndpoint, instanceId, log, cb), const initialID = process.env.INITIAL_INSTANCE_ID;
(instanceId, token, cb) => { if (initialID && initialID !== instanceId) {
if (!isManagementAgentUsed()) { log.error('INITIAL_INSTANCE_ID value is different from ' +
cb(null, instanceId, token, {}); 'instance id saved in metadata', {
return; envVarValue: initialID,
metadataValue: instanceId,
});
return callback(errors.InvalidParameterValue);
} }
return cb(null, instanceId);
},
(instanceId, cb) => initManagementCredentials(instanceId, log, cb),
(instanceId, token, cb) => {
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId, loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
token, overlay)); token, overlay));
}, },
(instanceId, token, overlay, cb) => { (instanceId, token, overlay, cb) => {
if (!isManagementAgentUsed()) {
cb(null, instanceId, token, overlay);
return;
}
patchConfiguration(overlay, log, patchConfiguration(overlay, log,
err => cb(err, instanceId, token, overlay)); err => cb(err, instanceId, token, overlay));
}, },
@ -119,13 +105,16 @@ function initManagement(log, callback) {
method: 'initManagement' }); method: 'initManagement' });
setTimeout(initManagement, setTimeout(initManagement,
initRemoteManagementRetryDelay, initRemoteManagementRetryDelay,
logger.newRequestLogger()); logger.newRequestLogger(),
newOverlayCallback,
callback
);
} else { } else {
log.info(`this deployment's Instance ID is ${instanceId}`); log.info(`this deployment's Instance ID is ${instanceId}`);
log.end('management init done'); log.end('management init done');
startManagementListeners(instanceId, token); startManagementListeners(instanceId, newOverlayCallback, token);
if (callback) { if (callback) {
callback(overlay); callback(null, overlay);
} }
} }
}); });

View File

@ -7,16 +7,17 @@ const logger = require('../utilities/logger');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const { const {
loadCachedOverlay, loadCachedOverlay,
patchConfiguration,
saveConfigurationVersion,
} = require('./configuration'); } = require('./configuration');
const { managementEndpoint } = require('./constants');
const { reshapeExceptionError } = arsenal.errorUtils; const { reshapeExceptionError } = arsenal.errorUtils;
const pushReportDelay = 30000; const pushReportDelay = 30000;
const pullConfigurationOverlayDelay = 60000; const pullConfigurationOverlayDelay = 60000;
function loadRemoteOverlay( let overlayMessageListener = null;
managementEndpoint, instanceId, remoteToken, cachedOverlay, log, cb) { let cachedOverlay = null;
function loadRemoteOverlay(instanceId, remoteToken, log, cb) {
log.debug('loading remote overlay'); log.debug('loading remote overlay');
const opts = { const opts = {
headers: { headers: {
@ -30,29 +31,34 @@ function loadRemoteOverlay(
return cb(error); return cb(error);
} }
if (response.statusCode === 200) { if (response.statusCode === 200) {
return cb(null, cachedOverlay, body); return cb(null, body);
} }
if (response.statusCode === 404) { if (response.statusCode === 404) {
return cb(null, cachedOverlay, {}); return cb(null, {});
} }
return cb(arsenal.errors.AccessForbidden, cachedOverlay, {}); return cb(arsenal.errors.AccessForbidden, {});
}).json(); }).json();
} }
// TODO save only after successful patch // TODO save only after successful patch
function applyConfigurationOverlay( function applyConfigurationOverlay(instanceId, remoteToken, log) {
managementEndpoint, instanceId, remoteToken, log) {
async.waterfall([ async.waterfall([
wcb => loadCachedOverlay(log, wcb), wcb => {
(cachedOverlay, wcb) => patchConfiguration(cachedOverlay, if (cachedOverlay) {
log, wcb), wcb(null, cachedOverlay);
(cachedOverlay, wcb) => } else {
loadRemoteOverlay(managementEndpoint, instanceId, remoteToken, loadCachedOverlay(log, wcb);
cachedOverlay, log, wcb), }
(cachedOverlay, remoteOverlay, wcb) => },
saveConfigurationVersion(cachedOverlay, remoteOverlay, log, wcb), overlay => { cachedOverlay = overlay; },
(remoteOverlay, wcb) => patchConfiguration(remoteOverlay, wcb => loadRemoteOverlay(instanceId, remoteToken,
log, wcb), log, wcb),
remoteOverlay => {
cachedOverlay = remoteOverlay;
if (overlayMessageListener) {
overlayMessageListener(JSON.stringify(cachedOverlay));
}
},
], error => { ], error => {
if (error) { if (error) {
log.error('could not apply managed configuration', log.error('could not apply managed configuration',
@ -60,12 +66,11 @@ function applyConfigurationOverlay(
method: 'applyConfigurationOverlay' }); method: 'applyConfigurationOverlay' });
} }
setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay, setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay,
managementEndpoint, instanceId, remoteToken, instanceId, remoteToken, logger.newRequestLogger());
logger.newRequestLogger());
}); });
} }
function postStats(managementEndpoint, instanceId, remoteToken, next) { function postStats(instanceId, remoteToken, next) {
const toURL = `${managementEndpoint}/${instanceId}/stats`; const toURL = `${managementEndpoint}/${instanceId}/stats`;
const toOptions = { const toOptions = {
headers: { headers: {
@ -99,14 +104,19 @@ function getStats() {
return request(fromURL, fromOptions).json(); return request(fromURL, fromOptions).json();
} }
function pushStats(managementEndpoint, instanceId, remoteToken, next) { function pushStats(instanceId, remoteToken, next) {
if (process.env.PUSH_STATS === 'false') { if (process.env.PUSH_STATS === 'false') {
return; return;
} }
getStats().pipe( getStats()
postStats(managementEndpoint, instanceId, remoteToken, next)); .on('error', err => {
setTimeout(pushStats, pushReportDelay, /* If the management process launches too quick, the cloud server may
managementEndpoint, instanceId, remoteToken); * not be listening yet. */
logger.info('failed to get stats', { err });
})
.pipe(
postStats(instanceId, remoteToken, next));
setTimeout(pushStats, pushReportDelay, instanceId, remoteToken);
} }
/** /**
@ -115,20 +125,23 @@ function pushStats(managementEndpoint, instanceId, remoteToken, next) {
* Periodically polls for configuration updates, and pushes stats at * Periodically polls for configuration updates, and pushes stats at
* a fixed interval. * a fixed interval.
* *
* @param {string} managementEndpoint API endpoint
* @param {string} instanceId UUID of this deployment * @param {string} instanceId UUID of this deployment
* @param {string} remoteToken API authentication token * @param {string} remoteToken API authentication token
* @param {function} newOverlayCallback Function to call once a new overlay is
* loaded (overlay)
* *
* @returns {undefined} * @returns {undefined}
*/ */
function startPollingManagementClient( function startPollingManagementClient(
managementEndpoint, instanceId, remoteToken) { instanceId, remoteToken, newOverlayCallback) {
overlayMessageListener = newOverlayCallback;
metadata.notifyBucketChange(() => { metadata.notifyBucketChange(() => {
pushStats(managementEndpoint, instanceId, remoteToken); pushStats(instanceId, remoteToken);
}); });
pushStats(managementEndpoint, instanceId, remoteToken); pushStats(instanceId, remoteToken);
applyConfigurationOverlay(managementEndpoint, instanceId, remoteToken, applyConfigurationOverlay(instanceId, remoteToken,
logger.newRequestLogger()); logger.newRequestLogger());
} }

View File

@ -4,19 +4,16 @@ const net = require('net');
const request = require('request'); const request = require('request');
const { URL } = require('url'); const { URL } = require('url');
const WebSocket = require('ws'); const WebSocket = require('ws');
const assert = require('assert');
const _config = require('../Config').config; const _config = require('../Config').config;
const logger = require('../utilities/logger'); const logger = require('../utilities/logger');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const { reshapeExceptionError } = arsenal.errorUtils; const { reshapeExceptionError } = arsenal.errorUtils;
const { isManagementAgentUsed } = require('./agentClient');
const { applyAndSaveOverlay } = require('./configuration');
const { const {
ChannelMessageV0, ChannelMessageV0,
MessageType, MessageType,
} = require('./ChannelMessageV0'); } = require('./ChannelMessageV0');
const { pushEndpoint } = require('./constants');
const { const {
CONFIG_OVERLAY_MESSAGE, CONFIG_OVERLAY_MESSAGE,
@ -65,19 +62,23 @@ function createWSAgent(pushEndpoint, env, log) {
return null; return null;
} }
function pushEndpointUrlFromInstanceId(instanceId) {
return `${pushEndpoint}/${instanceId}/ws`;
}
/** /**
* Starts background task that updates configuration and pushes stats. * Starts background task that updates configuration and pushes stats.
* *
* Receives pushed Websocket messages on configuration updates, and * Receives pushed Websocket messages on configuration updates, and
* sends stat messages in response to API sollicitations. * sends stat messages in response to API sollicitations.
* *
* @param {string} pushEndpoint API endpoint
* @param {string} instanceId UUID of this deployment * @param {string} instanceId UUID of this deployment
* @param {string} token API authentication token * @param {string} token API authentication token
* * @param {function} newOverlayCallback Function to call once a new overlay is
* loaded (overlay)
* @returns {undefined} * @returns {undefined}
*/ */
function startWSManagementClient(pushEndpoint, instanceId, token) { function startWSManagementClient(instanceId, token, newOverlayCallback) {
logger.info('connecting to push server'); logger.info('connecting to push server');
function _logError(error, errorMessage, method) { function _logError(error, errorMessage, method) {
if (error) { if (error) {
@ -86,14 +87,18 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
} }
} }
overlayMessageListener = newOverlayCallback;
const socketsByChannelId = []; const socketsByChannelId = [];
const headers = { const headers = {
'x-instance-authentication-token': token, 'x-instance-authentication-token': token,
}; };
const agent = createWSAgent(pushEndpoint, process.env, logger); const agent = createWSAgent(pushEndpoint, process.env, logger);
const url = `${pushEndpoint}/${instanceId}/ws`; const url = pushEndpointUrlFromInstanceId(instanceId);
const ws = new WebSocket(url, subprotocols, { headers, agent }); const ws = new WebSocket(url, subprotocols, { headers, agent });
let pingTimeout = null; let pingTimeout = null;
function sendPing() { function sendPing() {
@ -112,7 +117,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
if (process.env.PUSH_STATS === 'false') { if (process.env.PUSH_STATS === 'false') {
return; return;
} }
const fromURL = `http://localhost:${_config.port}/_/report`; const fromURL = process.env.STAT_REPORT_URL ||
`http://localhost:${_config.port}/_/report`;
const fromOptions = { const fromOptions = {
headers: { headers: {
'x-scal-report-token': process.env.REPORT_TOKEN, 'x-scal-report-token': process.env.REPORT_TOKEN,
@ -121,7 +127,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
}; };
request(fromURL, fromOptions, (err, response, body) => { request(fromURL, fromOptions, (err, response, body) => {
if (err) { if (err) {
_logError(err, 'failed to get metrics report', 'pushStats'); const what = 'failed to get metrics report';
const msg =
`${what} at ${fromURL} with headers ${fromOptions}`;
_logError(err, msg, 'pushStats');
return; return;
} }
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body), ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
@ -204,8 +213,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
metadata.notifyBucketChange(null); metadata.notifyBucketChange(null);
_config.removeListener('browser-access-enabled-change', _config.removeListener('browser-access-enabled-change',
browserAccessChangeHandler); browserAccessChangeHandler);
setTimeout(startWSManagementClient, 10000, pushEndpoint, const timeout = process.env.ORBIT_CONNECTION_TIMEOUT_MS || 10000;
instanceId, token);
setTimeout(startWSManagementClient, timeout, instanceId, token,
newOverlayCallback);
}); });
ws.on('error', err => { ws.on('error', err => {
@ -224,18 +235,13 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
}); });
ws.on('message', data => { ws.on('message', data => {
const log = logger.newRequestLogger();
const message = new ChannelMessageV0(data); const message = new ChannelMessageV0(data);
switch (message.getType()) { switch (message.getType()) {
case CONFIG_OVERLAY_MESSAGE: case CONFIG_OVERLAY_MESSAGE:
if (!isManagementAgentUsed()) {
applyAndSaveOverlay(JSON.parse(message.getPayload()), log);
} else {
if (overlayMessageListener) { if (overlayMessageListener) {
overlayMessageListener(message.getPayload().toString()); overlayMessageListener(message.getPayload().toString());
} }
}
break; break;
case METRICS_REQUEST_MESSAGE: case METRICS_REQUEST_MESSAGE:
pushStats(); pushStats();
@ -256,13 +262,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
}); });
} }
function addOverlayMessageListener(callback) {
assert(typeof callback === 'function');
overlayMessageListener = callback;
}
module.exports = { module.exports = {
createWSAgent, createWSAgent,
startWSManagementClient, startWSManagementClient,
addOverlayMessageListener, pushEndpointUrlFromInstanceId,
}; };

View File

@ -14,11 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
const api = require('./api/api'); const api = require('./api/api');
const data = require('./data/wrapper'); const data = require('./data/wrapper');
const metadata = require('./metadata/wrapper'); const metadata = require('./metadata/wrapper');
const { initManagement } = require('./management'); const { initManagementClient } = require('./management/agentClient');
const {
initManagementClient,
isManagementAgentUsed,
} = require('./management/agentClient');
const routes = arsenal.s3routes.routes; const routes = arsenal.s3routes.routes;
const websiteEndpoints = _config.websiteEndpoints; const websiteEndpoints = _config.websiteEndpoints;
@ -40,7 +36,6 @@ const STATS_INTERVAL = 5; // 5 seconds
const STATS_EXPIRY = 30; // 30 seconds const STATS_EXPIRY = 30; // 30 seconds
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
STATS_EXPIRY); STATS_EXPIRY);
const enableRemoteManagement = true;
class S3Server { class S3Server {
/** /**
@ -153,16 +148,8 @@ class S3Server {
// TODO this should wait for metadata healthcheck to be ok // TODO this should wait for metadata healthcheck to be ok
// TODO only do this in cluster master // TODO only do this in cluster master
if (enableRemoteManagement) {
if (!isManagementAgentUsed()) {
setTimeout(() => {
initManagement(logger.newRequestLogger());
}, 5000);
} else {
initManagementClient(); initManagementClient();
} }
}
}
/* /*
* This exits the running process properly. * This exits the running process properly.

View File

@ -5,15 +5,16 @@ const logger = require('./lib/utilities/logger');
const { initManagement } = require('./lib/management'); const { initManagement } = require('./lib/management');
const _config = require('./lib/Config').config; const _config = require('./lib/Config').config;
const { managementAgentMessageType } = require('./lib/management/agentClient'); const { managementAgentMessageType } = require('./lib/management/agentClient');
const { addOverlayMessageListener } = require('./lib/management/push');
const { saveConfigurationVersion } = require('./lib/management/configuration'); const { saveConfigurationVersion } = require('./lib/management/configuration');
const {
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
WS_STATUS_IDLE,
} = require('./lib/management/constants');
// TODO: auth? // TODO: auth?
// TODO: werelogs with a specific name. // TODO: werelogs with a specific name.
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
class ManagementAgentServer { class ManagementAgentServer {
constructor() { constructor() {
@ -29,25 +30,21 @@ class ManagementAgentServer {
process.on('SIGPIPE', () => {}); process.on('SIGPIPE', () => {});
} }
start(_cb) { start() {
const cb = _cb || function noop() {};
/* Define REPORT_TOKEN env variable needed by the management /* Define REPORT_TOKEN env variable needed by the management
* module. */ * module. */
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|| _config.reportToken || _config.reportToken
|| Uuid.v4(); || Uuid.v4();
initManagement(logger.newRequestLogger(), overlay => { /* The initManegement function retries when it fails. */
let error = null; const log = logger.newRequestLogger();
initManagement(log, this.onNewOverlay.bind(this), (err, overlay) => {
if (overlay) { if (err) {
process.exit(0);
}
this.loadedOverlay = overlay; this.loadedOverlay = overlay;
this.startServer(); this.startServer();
} else {
error = new Error('failed to init management');
}
return cb(error);
}); });
} }
@ -75,8 +72,6 @@ class ManagementAgentServer {
setInterval(this.checkBrokenConnections.bind(this), setInterval(this.checkBrokenConnections.bind(this),
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS); CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
addOverlayMessageListener(this.onNewOverlay.bind(this));
} }
onConnection(socket, request) { onConnection(socket, request) {
@ -154,6 +149,14 @@ class ManagementAgentServer {
return; return;
} }
this.loadedOverlay = remoteOverlayObj; this.loadedOverlay = remoteOverlayObj;
if (this.loadedOverlay.browserAccess) {
if (Boolean(_config.browserAccessEnabled) !==
Boolean(this.loadedOverlay.browserAccess.enabled)) {
_config.browserAccessEnabled =
Boolean(this.loadedOverlay.browserAccess.enabled);
_config.emit('browser-access-enabled-change');
}
}
this.wss.clients.forEach( this.wss.clients.forEach(
this._sendNewOverlayToClient.bind(this) this._sendNewOverlayToClient.bind(this)
); );
@ -166,7 +169,7 @@ class ManagementAgentServer {
logger.info('close broken connection', { logger.info('close broken connection', {
client: client._socket._peername, client: client._socket._peername,
}); });
client.terminate(); client.close(WS_STATUS_IDLE.code, WS_STATUS_IDLE.reason);
return; return;
} }
client.isAlive = false; client.isAlive = false;

View File

@ -89,12 +89,13 @@
"lint_md": "mdlint $(git ls-files '*.md')", "lint_md": "mdlint $(git ls-files '*.md')",
"mem_backend": "S3BACKEND=mem node index.js", "mem_backend": "S3BACKEND=mem node index.js",
"perf": "mocha tests/performance/s3standard.js", "perf": "mocha tests/performance/s3standard.js",
"start": "npm-run-all --parallel start_dmd start_s3server", "start": "npm-run-all --parallel start_dmd start_management start_s3server",
"start_mongo": "npm run cloudserver", "start_mongo": "npm run cloudserver",
"start_mdserver": "node mdserver.js", "start_mdserver": "node mdserver.js",
"start_dataserver": "node dataserver.js", "start_dataserver": "node dataserver.js",
"start_s3server": "node index.js", "start_s3server": "node index.js",
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver", "start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
"start_management": "node managementAgent.js",
"start_utapi": "node lib/utapi/utapi.js", "start_utapi": "node lib/utapi/utapi.js",
"utapi_replay": "node lib/utapi/utapiReplay.js", "utapi_replay": "node lib/utapi/utapiReplay.js",
"management_agent": "node managementAgent.js", "management_agent": "node managementAgent.js",