Compare commits

...

2 Commits

Author SHA1 Message Date
jeremyds 812751a5fb ft: ZENKO-714: management agent client in S3.
Now there is a new process handling the management feature we should add
a websocket client in S3 to receive the overlay. To safely move from the
management code in S3 to the full use of the new management process,
this code is run only if the env variable MANAGEGEMENT_USE_AGENT is
defined and equal to one.
2018-09-13 09:16:05 -07:00
jeremyds 0cee70f3f9 ft: ZENKO-714: export applyAndSaveOverlay
This function will be needed by the management agent client in S3.
Export it and take the opportunity to move it to configuration.js.
2018-09-13 09:16:05 -07:00
5 changed files with 119 additions and 41 deletions

View File

@ -1,14 +1,69 @@
const WebSocket = require('ws');
const logger = require('../utilities/logger');
const _config = require('../Config').config;
const { applyAndSaveOverlay } = require('./configuration');
const managementAgentMessageType = {
/** Message that contains the loaded overlay */
NEW_OVERLAY: 1,
};
const CONNECTION_RETRY_TIMEOUT_MS = 5000;
function initManagementClient() {
const host = _config.managementAgent.host;
const port = _config.managementAgent.port;
const ws = new WebSocket(`ws://${host}:${port}/watch`);
ws.on('open', () => {
logger.info('connected with management agent');
});
ws.on('close', (code, reason) => {
logger.info('disconnected from management agent', { reason });
setTimeout(initManagementClient, CONNECTION_RETRY_TIMEOUT_MS);
});
ws.on('error', error => {
logger.error('error on connection with management agent', { error });
});
ws.on('message', data => {
const log = logger.newRequestLogger();
const msg = JSON.parse(data);
if (msg.payload === undefined) {
log.error('message without payload');
return;
}
if (typeof msg.messageType !== 'number') {
log.error('messageType is not an integer', {
type: typeof msg.messageType,
});
return;
}
switch (msg.messageType) {
case managementAgentMessageType.NEW_OVERLAY:
applyAndSaveOverlay(msg.payload, log);
break;
default:
log.error('new overlay message version without payload');
return;
}
});
}
function isManagementAgentUsed() {
return process.env.MANAGEMENT_USE_AGENT === '1';
}
module.exports = {
managementAgentMessageType,
initManagementClient,
isManagementAgentUsed,
};

View File

@ -318,10 +318,33 @@ function loadCachedOverlay(log, callback) {
});
}
function applyAndSaveOverlay(overlay, log) {
patchConfiguration(overlay, log, err => {
if (err) {
log.error('could not apply pushed overlay', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
saveConfigurationVersion(null, overlay, log, err => {
if (err) {
log.error('could not cache overlay version', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
log.info('overlay push processed');
});
});
}
module.exports = {
loadCachedOverlay,
managementDatabaseName,
patchConfiguration,
saveConfigurationVersion,
remoteOverlayIsNewer,
applyAndSaveOverlay,
};

View File

@ -80,6 +80,15 @@ function initManagement(log, callback) {
log.info('remote management disabled');
return;
}
/* Temporary check before to fully move to the process management agent. */
if (isManagementAgentUsed() ^ typeof callback === 'function') {
let msg = 'misuse of initManagement function: ';
msg += `MANAGEMENT_USE_AGENT: ${process.env.MANAGEMENT_USE_AGENT}`;
msg += `, callback type: ${typeof callback}`;
throw new Error(msg);
}
async.waterfall([
// eslint-disable-next-line arrow-body-style
cb => { return isManagementAgentUsed() ? metadata.setup(cb) : cb(); },
@ -87,20 +96,30 @@ function initManagement(log, callback) {
cb => metadata.getUUID(log, cb),
(instanceId, cb) => initManagementCredentials(
managementEndpoint, instanceId, log, cb),
(instanceId, token, cb) =>
(instanceId, token, cb) => {
if (!isManagementAgentUsed()) {
cb(null, instanceId, token);
return;
}
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
token, overlay)),
(instanceId, token, overlay, cb) =>
token, overlay));
},
(instanceId, token, overlay, cb) => {
if (!isManagementAgentUsed()) {
cb(null, instanceId, token, overlay);
return;
}
patchConfiguration(overlay, log,
err => cb(err, instanceId, token, overlay)),
err => cb(err, instanceId, token, overlay));
},
], (error, instanceId, token, overlay) => {
if (error) {
log.error('could not initialize remote management, retrying later',
{ error: reshapeExceptionError(error),
method: 'initManagement' });
setTimeout(initManagement,
initRemoteManagementRetryDelay,
logger.newRequestLogger());
initRemoteManagementRetryDelay,
logger.newRequestLogger());
} else {
log.info(`this deployment's Instance ID is ${instanceId}`);
log.end('management init done');

View File

@ -9,20 +9,15 @@ const assert = require('assert');
const _config = require('../Config').config;
const logger = require('../utilities/logger');
const metadata = require('../metadata/wrapper');
const { reshapeExceptionError } = arsenal.errorUtils;
const {
patchConfiguration,
saveConfigurationVersion,
} = require('./configuration');
const { isManagementAgentUsed } = require('./agentClient');
const { applyAndSaveOverlay } = require('./configuration');
const {
ChannelMessageV0,
MessageType,
} = require('./ChannelMessageV0');
const { isManagementAgentUsed } = require('./agentClient');
const {
CONFIG_OVERLAY_MESSAGE,
METRICS_REQUEST_MESSAGE,
@ -187,28 +182,6 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
socket.write(payload);
}
function applyAndSaveOverlay(overlay, log) {
patchConfiguration(overlay, log, err => {
if (err) {
log.error('could not apply pushed overlay', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
saveConfigurationVersion(null, overlay, log, err => {
if (err) {
log.error('could not cache overlay version', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
log.info('overlay push processed');
});
});
}
function browserAccessChangeHandler() {
if (!_config.browserAccessEnabled) {
socketsByChannelId.forEach(s => s.close());

View File

@ -15,6 +15,10 @@ const api = require('./api/api');
const data = require('./data/wrapper');
const metadata = require('./metadata/wrapper');
const { initManagement } = require('./management');
const {
initManagementClient,
isManagementAgentUsed,
} = require('./management/agentClient');
const routes = arsenal.s3routes.routes;
const websiteEndpoints = _config.websiteEndpoints;
@ -149,11 +153,15 @@ class S3Server {
// TODO this should wait for metadata healthcheck to be ok
// TODO only do this in cluster master
setTimeout(() => {
if (enableRemoteManagement) {
initManagement(logger.newRequestLogger());
}
}, 5000);
if (!isManagementAgentUsed()) {
setTimeout(() => {
if (enableRemoteManagement) {
initManagement(logger.newRequestLogger());
}
}, 5000);
} else {
initManagementClient();
}
}
/*