Compare commits
19 Commits
developmen
...
bugfix/wip
Author | SHA1 | Date |
---|---|---|
jeremyds | f8d4a533ff | |
jeremyds | 1f6483e0ee | |
jeremyds | 3ce765c2dc | |
jeremyds | 33d94fc375 | |
jeremyds | 486459749c | |
jeremyds | 9d27b34209 | |
jeremyds | 4ecb9b6717 | |
jeremyds | 5c51b5aa19 | |
jeremyds | 2575134e58 | |
jeremyds | ff79b630da | |
jeremyds | a8bcfda3e4 | |
jeremyds | 3c4a6b8d95 | |
jeremyds | 6fd3c9f264 | |
jeremyds | a9592e7ac3 | |
jeremyds | d9659627cc | |
jeremyds | c1095d2246 | |
jeremyds | 1fbfe90951 | |
jeremyds | b9d4371ff3 | |
Thomas Carmet | 1234e55622 |
27
eve/main.yml
27
eve/main.yml
|
@ -227,6 +227,33 @@ stages:
|
||||||
<<: *global-env
|
<<: *global-env
|
||||||
- Upload: *upload-artifacts
|
- Upload: *upload-artifacts
|
||||||
|
|
||||||
|
management-agent-tests:
|
||||||
|
worker: &s3-pod
|
||||||
|
type: kube_pod
|
||||||
|
path: eve/workers/pod.yaml
|
||||||
|
images:
|
||||||
|
aggressor: eve/workers/build
|
||||||
|
s3: "."
|
||||||
|
vars:
|
||||||
|
aggressorMemLimit: "2Gi"
|
||||||
|
s3MemLimit: "1664Mi"
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- Git: *clone
|
||||||
|
- ShellCommand: *credentials
|
||||||
|
- ShellCommand: *npm-install
|
||||||
|
- ShellCommand:
|
||||||
|
command: |
|
||||||
|
set -ex
|
||||||
|
bash wait_for_local_port.bash 8000 40
|
||||||
|
npm run ft_test
|
||||||
|
<<: *follow-s3-log
|
||||||
|
env:
|
||||||
|
<<: *mongo-vars
|
||||||
|
<<: *global-env
|
||||||
|
- Upload: *upload-artifacts
|
||||||
|
|
||||||
|
|
||||||
file-ft-tests:
|
file-ft-tests:
|
||||||
worker:
|
worker:
|
||||||
type: kube_pod
|
type: kube_pod
|
||||||
|
|
|
@ -908,6 +908,25 @@ class Config extends EventEmitter {
|
||||||
this.outboundProxy.certs = certObj.certs;
|
this.outboundProxy.certs = certObj.certs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.managementAgent = {};
|
||||||
|
this.managementAgent.port = 8010;
|
||||||
|
this.managementAgent.host = 'localhost';
|
||||||
|
if (config.managementAgent !== undefined) {
|
||||||
|
if (config.managementAgent.port !== undefined) {
|
||||||
|
assert(Number.isInteger(config.managementAgent.port)
|
||||||
|
&& config.managementAgent.port > 0,
|
||||||
|
'bad config: managementAgent port must be a positive ' +
|
||||||
|
'integer');
|
||||||
|
this.managementAgent.port = config.managementAgent.port;
|
||||||
|
}
|
||||||
|
if (config.managementAgent.host !== undefined) {
|
||||||
|
assert.strictEqual(typeof config.managementAgent.host, 'string',
|
||||||
|
'bad config: management agent host must ' +
|
||||||
|
'be a string');
|
||||||
|
this.managementAgent.host = config.managementAgent.host;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Ephemeral token to protect the reporting endpoint:
|
// Ephemeral token to protect the reporting endpoint:
|
||||||
// try inherited from parent first, then hardcoded in conf file,
|
// try inherited from parent first, then hardcoded in conf file,
|
||||||
// then create a fresh one as last resort.
|
// then create a fresh one as last resort.
|
||||||
|
@ -915,7 +934,6 @@ class Config extends EventEmitter {
|
||||||
process.env.REPORT_TOKEN ||
|
process.env.REPORT_TOKEN ||
|
||||||
config.reportToken ||
|
config.reportToken ||
|
||||||
uuid.v4().toString();
|
uuid.v4().toString();
|
||||||
this.reportEndpoint = process.env.REPORT_ENDPOINT;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_configureBackends() {
|
_configureBackends() {
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
const WebSocket = require('ws');
|
||||||
|
const logger = require('../utilities/logger');
|
||||||
|
const _config = require('../Config').config;
|
||||||
|
const { applyAndSaveOverlay } = require('./configuration');
|
||||||
|
|
||||||
|
|
||||||
|
const managementAgentMessageType = {
|
||||||
|
/** Message that contains the loaded overlay */
|
||||||
|
NEW_OVERLAY: 1,
|
||||||
|
};
|
||||||
|
|
||||||
|
const CONNECTION_RETRY_TIMEOUT_MS = 5000;
|
||||||
|
|
||||||
|
|
||||||
|
function initManagementClient() {
|
||||||
|
const host = _config.managementAgent.host;
|
||||||
|
const port = _config.managementAgent.port;
|
||||||
|
|
||||||
|
const ws = new WebSocket(`ws://${host}:${port}/watch`);
|
||||||
|
|
||||||
|
ws.on('open', () => {
|
||||||
|
logger.info('connected with management agent');
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('close', (code, reason) => {
|
||||||
|
logger.info('disconnected from management agent', { reason });
|
||||||
|
setTimeout(initManagementClient, CONNECTION_RETRY_TIMEOUT_MS);
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('error', error => {
|
||||||
|
logger.error('error on connection with management agent', { error });
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('message', data => {
|
||||||
|
const log = logger.newRequestLogger();
|
||||||
|
const msg = JSON.parse(data);
|
||||||
|
|
||||||
|
if (msg.payload === undefined) {
|
||||||
|
log.error('message without payload');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (typeof msg.messageType !== 'number') {
|
||||||
|
log.error('messageType is not an integer', {
|
||||||
|
type: typeof msg.messageType,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (msg.messageType) {
|
||||||
|
case managementAgentMessageType.NEW_OVERLAY:
|
||||||
|
applyAndSaveOverlay(msg.payload, log);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
log.error('new overlay message version without payload');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
managementAgentMessageType,
|
||||||
|
initManagementClient,
|
||||||
|
};
|
|
@ -256,12 +256,21 @@ function saveConfigurationVersion(cachedOverlay, remoteOverlay, log, cb) {
|
||||||
metadata.putObjectMD(managementDatabaseName, objName, remoteOverlay,
|
metadata.putObjectMD(managementDatabaseName, objName, remoteOverlay,
|
||||||
{}, log, error => {
|
{}, log, error => {
|
||||||
if (error) {
|
if (error) {
|
||||||
log.error('could not save configuration version',
|
log.error('could not save configuration',
|
||||||
{ configurationVersion: remoteOverlay.version });
|
{ configurationVersion: remoteOverlay.version });
|
||||||
|
cb(error);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
metadata.putObjectMD(managementDatabaseName,
|
metadata.putObjectMD(managementDatabaseName,
|
||||||
latestOverlayVersionKey, remoteOverlay.version, {}, log,
|
latestOverlayVersionKey, remoteOverlay.version, {}, log,
|
||||||
error => cb(error, remoteOverlay));
|
error => {
|
||||||
|
if (error) {
|
||||||
|
log.error('could not save configuration version', {
|
||||||
|
configurationVersion: remoteOverlay.version,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
cb(error, remoteOverlay);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
log.debug('no remote configuration to cache yet');
|
log.debug('no remote configuration to cache yet');
|
||||||
|
@ -300,10 +309,31 @@ function loadCachedOverlay(log, callback) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function applyAndSaveOverlay(overlay, log) {
|
||||||
|
patchConfiguration(overlay, log, err => {
|
||||||
|
if (err) {
|
||||||
|
log.error('could not apply pushed overlay', {
|
||||||
|
error: err,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
saveConfigurationVersion(null, overlay, log, err => {
|
||||||
|
if (err) {
|
||||||
|
log.error('could not cache overlay version', {
|
||||||
|
error: err,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
log.info('overlay push processed');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
loadCachedOverlay,
|
loadCachedOverlay,
|
||||||
managementDatabaseName,
|
managementDatabaseName,
|
||||||
patchConfiguration,
|
patchConfiguration,
|
||||||
saveConfigurationVersion,
|
saveConfigurationVersion,
|
||||||
remoteOverlayIsNewer,
|
remoteOverlayIsNewer,
|
||||||
|
applyAndSaveOverlay,
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
/* From RFC6455 which defines some reserved status code ranges, the range
|
||||||
|
* 4000-4999 are for private use. */
|
||||||
|
const WS_STATUS_IDDLE = {
|
||||||
|
code: 4000,
|
||||||
|
reason: 'does not reply to ping before timeout',
|
||||||
|
};
|
||||||
|
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
WS_STATUS_IDDLE,
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||||
|
};
|
|
@ -65,39 +65,47 @@ function startManagementListeners(instanceId, token) {
|
||||||
*
|
*
|
||||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
||||||
* initialization process
|
* initialization process
|
||||||
|
* @param {function} callback Function to call once the overlay is loaded
|
||||||
|
* (overlay)
|
||||||
*
|
*
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
function initManagement(log) {
|
function initManagement(log, callback) {
|
||||||
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
|
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
|
||||||
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|
||||||
|| process.env.S3BACKEND === 'mem') {
|
|| process.env.S3BACKEND === 'mem') {
|
||||||
log.info('remote management disabled');
|
log.info('remote management disabled');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
|
cb => metadata.setup(cb),
|
||||||
cb => initManagementDatabase(log, cb),
|
cb => initManagementDatabase(log, cb),
|
||||||
cb => metadata.getUUID(log, cb),
|
cb => metadata.getUUID(log, cb),
|
||||||
(instanceId, cb) => initManagementCredentials(
|
(instanceId, cb) => initManagementCredentials(
|
||||||
managementEndpoint, instanceId, log, cb),
|
managementEndpoint, instanceId, log, cb),
|
||||||
(instanceId, token, cb) => loadCachedOverlay(log, (err, conf) => {
|
(instanceId, token, cb) => {
|
||||||
if (err) {
|
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
|
||||||
return cb(err);
|
token, overlay));
|
||||||
}
|
},
|
||||||
return patchConfiguration(conf, log,
|
(instanceId, token, overlay, cb) => {
|
||||||
err => cb(err, instanceId, token));
|
patchConfiguration(overlay, log,
|
||||||
}),
|
err => cb(err, instanceId, token, overlay));
|
||||||
], (error, instanceId, token) => {
|
},
|
||||||
|
], (error, instanceId, token, overlay) => {
|
||||||
if (error) {
|
if (error) {
|
||||||
log.error('could not initialize remote management, retrying later',
|
log.error('could not initialize remote management, retrying later',
|
||||||
{ error });
|
{ error });
|
||||||
setTimeout(initManagement,
|
setTimeout(initManagement,
|
||||||
initRemoteManagementRetryDelay,
|
initRemoteManagementRetryDelay,
|
||||||
logger.newRequestLogger());
|
logger.newRequestLogger());
|
||||||
} else {
|
} else {
|
||||||
log.info(`this deployment's Instance ID is ${instanceId}`);
|
log.info(`this deployment's Instance ID is ${instanceId}`);
|
||||||
log.end('management init done');
|
log.end('management init done');
|
||||||
startManagementListeners(instanceId, token);
|
startManagementListeners(instanceId, token);
|
||||||
|
if (callback) {
|
||||||
|
callback(overlay);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,15 +3,12 @@ const net = require('net');
|
||||||
const request = require('request');
|
const request = require('request');
|
||||||
const { URL } = require('url');
|
const { URL } = require('url');
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
const _config = require('../Config').config;
|
const _config = require('../Config').config;
|
||||||
const logger = require('../utilities/logger');
|
const logger = require('../utilities/logger');
|
||||||
const metadata = require('../metadata/wrapper');
|
const metadata = require('../metadata/wrapper');
|
||||||
|
const { applyAndSaveOverlay } = require('./configuration');
|
||||||
const {
|
|
||||||
patchConfiguration,
|
|
||||||
saveConfigurationVersion,
|
|
||||||
} = require('./configuration');
|
|
||||||
|
|
||||||
const {
|
const {
|
||||||
ChannelMessageV0,
|
ChannelMessageV0,
|
||||||
|
@ -28,6 +25,8 @@ const {
|
||||||
const PING_INTERVAL_MS = 10000;
|
const PING_INTERVAL_MS = 10000;
|
||||||
const subprotocols = [ChannelMessageV0.protocolName];
|
const subprotocols = [ChannelMessageV0.protocolName];
|
||||||
|
|
||||||
|
let overlayMessageListener = null;
|
||||||
|
|
||||||
// No wildcard nor cidr/mask match for now
|
// No wildcard nor cidr/mask match for now
|
||||||
function createWSAgent(pushEndpoint, env, log) {
|
function createWSAgent(pushEndpoint, env, log) {
|
||||||
const url = new URL(pushEndpoint);
|
const url = new URL(pushEndpoint);
|
||||||
|
@ -117,8 +116,12 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
request(fromURL, fromOptions, (err, response, body) => {
|
request(fromURL, fromOptions, (err, response, body) => {
|
||||||
|
if (err) {
|
||||||
|
logger.error('failed to push stats', { err });
|
||||||
|
return;
|
||||||
|
}
|
||||||
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
|
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
|
||||||
_logError);
|
_logError);
|
||||||
}).json();
|
}).json();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -133,10 +136,11 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
function receiveChannelData(channelId, payload) {
|
function receiveChannelData(channelId, payload) {
|
||||||
let socket = socketsByChannelId[channelId];
|
let socket = socketsByChannelId[channelId];
|
||||||
if (!socket) {
|
if (!socket) {
|
||||||
socket = net.createConnection({
|
const host = process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST
|
||||||
host: 'localhost',
|
|| 'localhost';
|
||||||
port: _config.port,
|
const port = process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT
|
||||||
});
|
|| _config.port;
|
||||||
|
socket = net.createConnection(port, host);
|
||||||
|
|
||||||
socket.on('data', data => {
|
socket.on('data', data => {
|
||||||
ws.send(ChannelMessageV0.
|
ws.send(ChannelMessageV0.
|
||||||
|
@ -149,6 +153,14 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
socket.on('drain', () => {
|
socket.on('drain', () => {
|
||||||
});
|
});
|
||||||
|
|
||||||
|
socket.on('error', error => {
|
||||||
|
logger.error('failed to connect to S3', {
|
||||||
|
code: error.code,
|
||||||
|
host: error.address,
|
||||||
|
port: error.port,
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
socket.on('end', () => {
|
socket.on('end', () => {
|
||||||
socket.destroy();
|
socket.destroy();
|
||||||
socketsByChannelId[channelId] = null;
|
socketsByChannelId[channelId] = null;
|
||||||
|
@ -161,26 +173,6 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
socket.write(payload);
|
socket.write(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
function applyAndSaveOverlay(overlay, log) {
|
|
||||||
patchConfiguration(overlay, log, err => {
|
|
||||||
if (err) {
|
|
||||||
log.error('could not apply pushed overlay', {
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
saveConfigurationVersion(null, overlay, log, err => {
|
|
||||||
if (err) {
|
|
||||||
log.error('could not cache overlay version', {
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
log.info('overlay push processed');
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
function browserAccessChangeHandler() {
|
function browserAccessChangeHandler() {
|
||||||
if (!_config.browserAccessEnabled) {
|
if (!_config.browserAccessEnabled) {
|
||||||
socketsByChannelId.forEach(s => s.close());
|
socketsByChannelId.forEach(s => s.close());
|
||||||
|
@ -228,7 +220,9 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
|
|
||||||
switch (message.getType()) {
|
switch (message.getType()) {
|
||||||
case CONFIG_OVERLAY_MESSAGE:
|
case CONFIG_OVERLAY_MESSAGE:
|
||||||
applyAndSaveOverlay(JSON.parse(message.getPayload()), log);
|
if (overlayMessageListener) {
|
||||||
|
overlayMessageListener(message.getPayload());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case METRICS_REQUEST_MESSAGE:
|
case METRICS_REQUEST_MESSAGE:
|
||||||
pushStats();
|
pushStats();
|
||||||
|
@ -249,7 +243,13 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function addOverlayMessageListener(callback) {
|
||||||
|
assert(typeof callback === 'function');
|
||||||
|
overlayMessageListener = callback;
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
createWSAgent,
|
createWSAgent,
|
||||||
startWSManagementClient,
|
startWSManagementClient,
|
||||||
|
addOverlayMessageListener,
|
||||||
};
|
};
|
||||||
|
|
|
@ -14,7 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
|
||||||
const api = require('./api/api');
|
const api = require('./api/api');
|
||||||
const data = require('./data/wrapper');
|
const data = require('./data/wrapper');
|
||||||
const metadata = require('./metadata/wrapper');
|
const metadata = require('./metadata/wrapper');
|
||||||
const { initManagement } = require('./management');
|
const { initManagementClient } = require('./management/agentClient');
|
||||||
|
|
||||||
const routes = arsenal.s3routes.routes;
|
const routes = arsenal.s3routes.routes;
|
||||||
const websiteEndpoints = _config.websiteEndpoints;
|
const websiteEndpoints = _config.websiteEndpoints;
|
||||||
|
@ -153,11 +153,7 @@ class S3Server {
|
||||||
|
|
||||||
// TODO this should wait for metadata healthcheck to be ok
|
// TODO this should wait for metadata healthcheck to be ok
|
||||||
// TODO only do this in cluster master
|
// TODO only do this in cluster master
|
||||||
setTimeout(() => {
|
initManagementClient();
|
||||||
if (enableRemoteManagement) {
|
|
||||||
initManagement(logger.newRequestLogger());
|
|
||||||
}
|
|
||||||
}, 5000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -0,0 +1,165 @@
|
||||||
|
const Uuid = require('uuid');
|
||||||
|
const WebSocket = require('ws');
|
||||||
|
|
||||||
|
const logger = require('./lib/utilities/logger');
|
||||||
|
const { initManagement } = require('./lib/management');
|
||||||
|
const _config = require('./lib/Config').config;
|
||||||
|
const { managementAgentMessageType } = require('./lib/management/agentClient');
|
||||||
|
const { addOverlayMessageListener } = require('./lib/management/push');
|
||||||
|
const {
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||||
|
WS_STATUS_IDDLE,
|
||||||
|
} = require('./lib/management/constants');
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: auth?
|
||||||
|
|
||||||
|
|
||||||
|
class ManagementAgentServer {
|
||||||
|
constructor() {
|
||||||
|
this.port = _config.managementAgent.port || 8010;
|
||||||
|
this.wss = null;
|
||||||
|
this.loadedOverlay = null;
|
||||||
|
|
||||||
|
this.stop = this.stop.bind(this);
|
||||||
|
process.on('SIGINT', this.stop);
|
||||||
|
process.on('SIGHUP', this.stop);
|
||||||
|
process.on('SIGQUIT', this.stop);
|
||||||
|
process.on('SIGTERM', this.stop);
|
||||||
|
process.on('SIGPIPE', () => {});
|
||||||
|
}
|
||||||
|
|
||||||
|
start(_cb) {
|
||||||
|
const cb = _cb || function noop() {};
|
||||||
|
|
||||||
|
/* Define REPORT_TOKEN env variable needed by the management
|
||||||
|
* module. */
|
||||||
|
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|
||||||
|
|| _config.reportToken
|
||||||
|
|| Uuid.v4();
|
||||||
|
|
||||||
|
/* The initManegement function retries when it fails. */
|
||||||
|
return initManagement(logger.newRequestLogger(), overlay => {
|
||||||
|
let error = null;
|
||||||
|
|
||||||
|
if (overlay) {
|
||||||
|
this.loadedOverlay = overlay;
|
||||||
|
this.startServer();
|
||||||
|
} else {
|
||||||
|
error = new Error('failed to init management');
|
||||||
|
}
|
||||||
|
return cb(error);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
stop() {
|
||||||
|
if (this.wss) {
|
||||||
|
this.wss.close(() => {
|
||||||
|
logger.info('server shutdown');
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
startServer() {
|
||||||
|
this.wss = new WebSocket.Server({
|
||||||
|
port: this.port,
|
||||||
|
clientTracking: true,
|
||||||
|
path: '/watch',
|
||||||
|
});
|
||||||
|
|
||||||
|
this.wss.on('connection', this.onConnection.bind(this));
|
||||||
|
this.wss.on('listening', this.onListening.bind(this));
|
||||||
|
this.wss.on('error', this.onError.bind(this));
|
||||||
|
|
||||||
|
setInterval(this.checkBrokenConnections.bind(this),
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||||
|
|
||||||
|
addOverlayMessageListener(this.onNewOverlay.bind(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
onConnection(socket, request) {
|
||||||
|
function hearthbeat() {
|
||||||
|
this.isAlive = true;
|
||||||
|
}
|
||||||
|
logger.info('client connected to watch route', {
|
||||||
|
ip: request.connection.remoteAddress,
|
||||||
|
});
|
||||||
|
|
||||||
|
/* eslint-disable no-param-reassign */
|
||||||
|
socket.isAlive = true;
|
||||||
|
socket.on('pong', hearthbeat.bind(socket));
|
||||||
|
|
||||||
|
if (socket.readyState !== socket.OPEN) {
|
||||||
|
logger.error('client socket not in ready state', {
|
||||||
|
state: socket.readyState,
|
||||||
|
client: socket._socket._peername,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const msg = {
|
||||||
|
messageType: managementAgentMessageType.NEW_OVERLAY,
|
||||||
|
payload: this.loadedOverlay,
|
||||||
|
};
|
||||||
|
socket.send(JSON.stringify(msg), error => {
|
||||||
|
if (error) {
|
||||||
|
logger.error('failed to send remoteOverlay to client', {
|
||||||
|
error,
|
||||||
|
client: socket._socket._peername,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
onListening() {
|
||||||
|
logger.info('websocket server listening',
|
||||||
|
{ port: this.port });
|
||||||
|
}
|
||||||
|
|
||||||
|
onError(error) {
|
||||||
|
logger.error('websocket server error', { error });
|
||||||
|
}
|
||||||
|
|
||||||
|
onNewOverlay(remoteOverlay) {
|
||||||
|
this.loadedOverlay = remoteOverlay;
|
||||||
|
this.wss.clients.forEach(client => {
|
||||||
|
if (client.readyState !== client.OPEN) {
|
||||||
|
logger.error('client socket not in ready state', {
|
||||||
|
state: client.readyState,
|
||||||
|
client: client._socket._peername,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const msg = {
|
||||||
|
messageType: managementAgentMessageType.NEW_OVERLAY,
|
||||||
|
payload: remoteOverlay,
|
||||||
|
};
|
||||||
|
client.send(JSON.stringify(msg), error => {
|
||||||
|
if (error) {
|
||||||
|
logger.error('failed to send remoteOverlay to management' +
|
||||||
|
' agent client', {
|
||||||
|
error,
|
||||||
|
client: client._socket._peername,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
checkBrokenConnections() {
|
||||||
|
this.wss.clients.forEach(client => {
|
||||||
|
if (!client.isAlive) {
|
||||||
|
logger.info('close broken connection', {
|
||||||
|
client: client._socket._peername,
|
||||||
|
});
|
||||||
|
client.close(WS_STATUS_IDDLE.code, WS_STATUS_IDDLE.reason);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
client.isAlive = false;
|
||||||
|
client.ping();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const server = new ManagementAgentServer();
|
||||||
|
server.start();
|
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"name": "@zenko/cloudserver",
|
"name": "@zenko/cloudserver",
|
||||||
"version": "8.0.0-beta",
|
"version": "8.1.0-beta",
|
||||||
"description": "Zenko CloudServer, an open-source Node.js implementation of a server handling the Amazon S3 protocol",
|
"description": "Zenko CloudServer, an open-source Node.js implementation of a server handling the Amazon S3 protocol",
|
||||||
"main": "index.js",
|
"main": "index.js",
|
||||||
"engines": {
|
"engines": {
|
||||||
|
@ -84,6 +84,7 @@
|
||||||
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
|
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
|
||||||
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
|
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
|
||||||
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
|
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
|
||||||
|
"ft_management_agent": "cd tests/functional/managementAgent/ && npm test",
|
||||||
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
|
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
|
||||||
"lint": "eslint $(git ls-files '*.js')",
|
"lint": "eslint $(git ls-files '*.js')",
|
||||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||||
|
@ -97,6 +98,7 @@
|
||||||
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
||||||
"start_utapi": "node lib/utapi/utapi.js",
|
"start_utapi": "node lib/utapi/utapi.js",
|
||||||
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
||||||
|
"management_agent": "node managementAgent.js",
|
||||||
"test": "CI=true S3BACKEND=mem mocha --recursive tests/unit",
|
"test": "CI=true S3BACKEND=mem mocha --recursive tests/unit",
|
||||||
"test_legacy_location": "CI=true S3_LOCATION_FILE=tests/locationConfig/locationConfigLegacy.json S3BACKEND=mem mocha --recursive tests/unit",
|
"test_legacy_location": "CI=true S3_LOCATION_FILE=tests/locationConfig/locationConfigLegacy.json S3BACKEND=mem mocha --recursive tests/unit",
|
||||||
"multiple_backend_test": "CI=true S3BACKEND=mem S3DATA=multiple mocha -t 20000 --recursive tests/multipleBackend",
|
"multiple_backend_test": "CI=true S3BACKEND=mem S3DATA=multiple mocha -t 20000 --recursive tests/multipleBackend",
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
[default]
|
||||||
|
region = us-east-1
|
|
@ -0,0 +1,3 @@
|
||||||
|
[default]
|
||||||
|
aws_access_key_id = Y1VC0G0ABJN0R6CY04N2
|
||||||
|
aws_secret_access_key = e8yosUHlaY46jXPDVKsAttw5StKR0HRtGBePPGBW
|
|
@ -0,0 +1,123 @@
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
const WebSocket = require('ws');
|
||||||
|
const assert = require('assert');
|
||||||
|
|
||||||
|
const logger = require('../../../lib/utilities/logger');
|
||||||
|
const _config = require('../../../lib/Config').config;
|
||||||
|
const {
|
||||||
|
managementAgentMessageType,
|
||||||
|
} = require('../../../lib/management/agentClient');
|
||||||
|
const {
|
||||||
|
patchConfiguration,
|
||||||
|
} = require('../../../lib/management/configuration');
|
||||||
|
const {
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||||
|
WS_STATUS_IDDLE,
|
||||||
|
} = require('../../../lib/management/constants');
|
||||||
|
|
||||||
|
const metadata = require('../../../lib/metadata/wrapper.js');
|
||||||
|
|
||||||
|
function createWs(path) {
|
||||||
|
const host = _config.managementAgent.host;
|
||||||
|
const port = _config.managementAgent.port;
|
||||||
|
return new WebSocket(`ws://${host}:${port}/${path || 'watch'}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('Management process', function testSuite() {
|
||||||
|
this.timeout(120000);
|
||||||
|
|
||||||
|
it('should setup metada', done => {
|
||||||
|
metadata.setup(done);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not listen on others routes than `watch`', done => {
|
||||||
|
const ws = createWs('wrong_path');
|
||||||
|
const msg = 'management agent process should not listen this route';
|
||||||
|
|
||||||
|
ws.on('open', () => {
|
||||||
|
logger.error('open');
|
||||||
|
return done(new Error(msg));
|
||||||
|
});
|
||||||
|
ws.on('error', error => {
|
||||||
|
logger.error('error', { error });
|
||||||
|
return done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should listen on `watch` route', done => {
|
||||||
|
const ws = createWs();
|
||||||
|
|
||||||
|
const msg = 'management agent process should listen this route';
|
||||||
|
ws.on('open', done);
|
||||||
|
ws.on('error', () => { done(new Error(msg)); });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send the loaded overlay as first message', done => {
|
||||||
|
let firstMsgReceived = false;
|
||||||
|
|
||||||
|
const ws = createWs();
|
||||||
|
|
||||||
|
ws.on('close', done);
|
||||||
|
ws.on('error', () => { done(new Error('connection error')); });
|
||||||
|
ws.on('message', data => {
|
||||||
|
if (!firstMsgReceived) {
|
||||||
|
firstMsgReceived = true;
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const msg = JSON.parse(data);
|
||||||
|
assert.strictEqual(
|
||||||
|
msg.messageType, managementAgentMessageType.NEW_OVERLAY
|
||||||
|
);
|
||||||
|
assert(msg.payload);
|
||||||
|
|
||||||
|
patchConfiguration(msg.payload, logger.newRequestLogger(), done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send the new overlay after the first message', done => {
|
||||||
|
const ws = createWs();
|
||||||
|
let firstMsgReceived = false;
|
||||||
|
|
||||||
|
ws.on('close', done);
|
||||||
|
ws.on('error', () => { done(new Error('connection error')); });
|
||||||
|
ws.on('message', data => {
|
||||||
|
if (!firstMsgReceived) {
|
||||||
|
firstMsgReceived = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const msg = JSON.parse(data);
|
||||||
|
assert.strictEqual(
|
||||||
|
msg.messageType, managementAgentMessageType.NEW_OVERLAY
|
||||||
|
);
|
||||||
|
assert(msg.payload);
|
||||||
|
|
||||||
|
patchConfiguration(msg.payload, logger.newRequestLogger(), done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should terminate the connection when a client does not answer ping',
|
||||||
|
done => {
|
||||||
|
this.timeout(2 * CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||||
|
|
||||||
|
const ws = createWs();
|
||||||
|
|
||||||
|
ws.on('close', (code, reason) => {
|
||||||
|
assert.strictEqual(code, WS_STATUS_IDDLE.code);
|
||||||
|
assert.strictEqual(reason, WS_STATUS_IDDLE.reason);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
ws.on('error', error => {
|
||||||
|
done(new Error('unexpected event'));
|
||||||
|
});
|
||||||
|
ws.on('message', data => {
|
||||||
|
/* Ugly eventTarget internal fields hacking to avoid this web
|
||||||
|
* socket to answer to ping messages. It will make the management
|
||||||
|
* agent to close the connection after a timeout.
|
||||||
|
* Defining an onPing event does not help, this internal function
|
||||||
|
* is still called. */
|
||||||
|
ws._receiver._events.ping = function noop() {};
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,6 @@
|
||||||
|
{
|
||||||
|
"tests": {
|
||||||
|
"files": [ "/test" ],
|
||||||
|
"on": "aggressor"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
{
|
||||||
|
"name": "management-agent-tests",
|
||||||
|
"scripts": {
|
||||||
|
"test": "mocha -t 40000 *.js"
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue