Compare commits

...

11 Commits

Author SHA1 Message Date
jeremyds 240d16a280 ft: ZENKO-716: tests 2018-11-12 10:56:20 -08:00
jeremyds ce8e01f37c ft: ZENKO-716: management process, handle browserAccess change
The overlay sent by orbit to Zenko contains a "browserAccess" field.
When this parameter is set to false, payload message sent by Orbit
should not be forwarded. With the code move from the cloudserver to the
new management process, this value should be handled and the appropriate
configuration event should be emitted on value change.
2018-11-01 13:50:22 -07:00
jeremyds 81181892d5 ft: ZENKO-716: fix infinite loop in initManagement.
The initManagement function has an automatic retry when it fails. The
problem is that when an INITIAL_INSTANCE_ID environment variable is
passed and there is already an instance ID saved in metadata and these
two values are different, the initManagement will retry infinitely.

This commit catches this situation and make the initManagement function
stop to retry and instead it raised an error to upper level. This error
is now caught in upper level and leads to a process exit.
2018-11-01 13:50:22 -07:00
jeremyds b3b371d597 ft: ZENKO-716: fix setTimeout 2018-11-01 13:50:22 -07:00
jeremyds 1804d8b10a ft: ZENKO-716: endpoint value 2018-11-01 13:50:22 -07:00
jeremyds 84c51bdac8 ft: ZENKO-716: new management process poll mode
This commit fixes the management pull mode of the new management
process. We don't need anymore to save and patch the overlay into
lib/management/poll.

Furthermore, like for the push mode, a callback should be fired when a
new overlay is received, to notify the upper layer. The initManagement
takes this new callback as parameter and the previous listener
registering in lib/management/push is no longer required.
2018-11-01 13:50:22 -07:00
jeremyds 9f6c5fac73 ft: ZENKO-716: rename env variablle for coherence. 2018-11-01 13:50:22 -07:00
jeremyds 5a578b3ef0 ft: ZENKO-716: management, use constant and env variables for tests 2018-11-01 13:50:22 -07:00
jeremyds 5dc99e24aa ft: ZENKO-716: add management message encoding helper function
Testing the management process feature will require to send this kind of
message.
2018-11-01 13:50:22 -07:00
jeremyds 052628a9d4 ft: ZENKO-716: mninor error log improvement
Add the url and the headers in the error log used when the stat push
request from management to orbit fails.
2018-11-01 13:50:22 -07:00
jeremyds 37a4fc30e0 ft: ZENKO-716: stop using management code in S3 2018-11-01 13:50:22 -07:00
19 changed files with 872 additions and 177 deletions

View File

@ -26,3 +26,4 @@ ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
CMD [ "npm", "start" ] CMD [ "npm", "start" ]
EXPOSE 8000 EXPOSE 8000
EXPOSE 9753

View File

@ -65,6 +65,16 @@ models:
S3BACKEND: "mem" S3BACKEND: "mem"
MPU_TESTING: "yes" MPU_TESTING: "yes"
S3METADATA: mongodb S3METADATA: mongodb
- env: &management-vars
S3BACKEND: "file"
S3METADATA: mongodb
PUSH_ENDPOINT: "http://localhost:9989/endpoint"
STAT_REPORT_URL: "http://localhost:8081/stat"
SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST: "localhost"
SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT: "8082"
MANAGEMENT_CHECK_CLIENT_FQCY_MS: "2000"
ORBIT_CONNECTION_TIMEOUT_MS: "1000"
MANAGEMENT_MODE: "push"
- env: &multiple-backend-vars - env: &multiple-backend-vars
S3BACKEND: "mem" S3BACKEND: "mem"
S3DATA: "multiple" S3DATA: "multiple"
@ -121,6 +131,7 @@ stages:
- file-ft-tests - file-ft-tests
- multiple-backend-test - multiple-backend-test
- mongo-ft-tests - mongo-ft-tests
- management-agent-tests
waitForFinish: True waitForFinish: True
haltOnFailure: True haltOnFailure: True
@ -165,6 +176,7 @@ stages:
vars: vars:
aggressorMemLimit: "2Gi" aggressorMemLimit: "2Gi"
s3MemLimit: "2Gi" s3MemLimit: "2Gi"
managementProcess: disabled
env: env:
<<: *multiple-backend-vars <<: *multiple-backend-vars
<<: *global-env <<: *global-env
@ -209,6 +221,7 @@ stages:
aggressorMemLimit: "2Gi" aggressorMemLimit: "2Gi"
s3MemLimit: "1664Mi" s3MemLimit: "1664Mi"
redis: enabled redis: enabled
managementProcess: disabled
env: env:
<<: *mongo-vars <<: *mongo-vars
<<: *global-env <<: *global-env
@ -227,6 +240,37 @@ stages:
<<: *global-env <<: *global-env
- Upload: *upload-artifacts - Upload: *upload-artifacts
management-agent-tests:
worker: &management-pod
type: kube_pod
path: eve/workers/pod.yaml
images:
aggressor: eve/workers/build
management_agent: "."
vars:
aggressorMemLimit: "1Gi"
managementProcess: enabled
env:
<<: *management-vars
<<: *global-env
steps:
- Git: *clone
- ShellCommand: *credentials
- ShellCommand: *npm-install
- ShellCommand:
command: |
bash -c "
set -ex
bash wait_for_local_port.bash 9753 40
source /artifacts/management_test_instance_id
npm run ft_management_agent"
env:
<<: *mongo-vars
<<: *management-vars
<<: *global-env
- Upload: *upload-artifacts
file-ft-tests: file-ft-tests:
worker: worker:
type: kube_pod type: kube_pod
@ -238,6 +282,7 @@ stages:
aggressorMemLimit: "1920Mi" aggressorMemLimit: "1920Mi"
s3MemLimit: "2Gi" s3MemLimit: "2Gi"
redis: enabled redis: enabled
managementProcess: disabled
env: env:
<<: *file-mem-mpu <<: *file-mem-mpu
<<: *global-env <<: *global-env

View File

@ -40,12 +40,13 @@ spec:
readOnly: false readOnly: false
mountPath: /root/.aws mountPath: /root/.aws
- name: artifacts - name: artifacts
readOnly: true readOnly: false
mountPath: /artifacts mountPath: /artifacts
command: command:
- bash - bash
- -lc - -lc
- | - |
echo "export INITIAL_INSTANCE_ID=$(od -x /dev/urandom | head -1 | awk '{OFS="-"; print $2$3,$4,$5,$6,$7$8$9}')" > /artifacts/management_test_instance_id
buildbot-worker create-worker . $BUILDMASTER:$BUILDMASTER_PORT $WORKERNAME $WORKERPASS buildbot-worker create-worker . $BUILDMASTER:$BUILDMASTER_PORT $WORKERNAME $WORKERPASS
buildbot-worker start --nodaemon buildbot-worker start --nodaemon
env: env:
@ -61,6 +62,7 @@ spec:
- name: {{ key }} - name: {{ key }}
value: "{{ value }}" value: "{{ value }}"
{% endfor %} {% endfor %}
{% if vars.managementProcess is not defined or vars.managementProcess != "enabled" -%}
- name: s3 - name: s3
image: {{ images.s3 }} image: {{ images.s3 }}
imagePullPolicy: IfNotPresent imagePullPolicy: IfNotPresent
@ -114,6 +116,64 @@ spec:
- name: {{ key }} - name: {{ key }}
value: "{{ value }}" value: "{{ value }}"
{% endfor %} {% endfor %}
{%- endif %}
{% if vars.managementProcess is defined and vars.managementProcess == "enabled" -%}
- name: management
image: {{ images.management_agent }}
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 100m
memory: 1Gi
limits:
cpu: 200m
memory: 1Gi
volumeMounts:
- name: creds
readOnly: false
mountPath: /root/.aws
- name: certs
readOnly: true
mountPath: /tmp
- name: artifacts
readOnly: false
mountPath: /artifacts
command:
- bash
- -ec
- |
sleep 10 # wait for mongo
source /artifacts/management_test_instance_id
/usr/src/app/docker-entrypoint.sh npm run management_agent | tee -a /artifacts/management.log
env:
{% if vars.env.S3DATA is defined and vars.env.S3DATA == "multiple" -%}
- name: S3_LOCATION_FILE
value: "/usr/src/app/tests/locationConfig/locationConfigTests.json"
{%- endif %}
- name: CI
value: "true"
- name: ENABLE_LOCAL_CACHE
value: "true"
- name: MONGODB_HOSTS
value: "localhost:27018"
- name: MONGODB_RS
value: "rs0"
- name: REDIS_HOST
value: "localhost"
- name: REDIS_PORT
value: "6379"
- name: REPORT_TOKEN
value: "report-token-1"
- name: REMOTE_MANAGEMENT_DISABLE
value: "0"
- name: HEALTHCHECKS_ALLOWFROM
value: "0.0.0.0/0"
{% for key, value in vars.env.items() %}
- name: {{ key }}
value: "{{ value }}"
{% endfor %}
{%- endif %}
{% if vars.redis is defined and vars.redis == "enabled" -%} {% if vars.redis is defined and vars.redis == "enabled" -%}
- name: redis - name: redis
image: redis:alpine image: redis:alpine

View File

@ -951,7 +951,7 @@ class Config extends EventEmitter {
} }
this.managementAgent = {}; this.managementAgent = {};
this.managementAgent.port = 8010; this.managementAgent.port = 9753;
this.managementAgent.host = 'localhost'; this.managementAgent.host = 'localhost';
if (config.managementAgent !== undefined) { if (config.managementAgent !== undefined) {
if (config.managementAgent.port !== undefined) { if (config.managementAgent.port !== undefined) {

View File

@ -100,7 +100,7 @@ class ChannelMessageV0 {
} }
/** /**
* Creates a wire representation of a metrics message * Creates a wire representation of a metrics report message
* *
* @param {object} body Metrics report * @param {object} body Metrics report
* *
@ -116,6 +116,40 @@ class ChannelMessageV0 {
return buf; return buf;
} }
/**
* Creates a wire representation of a configuration overlay message
*
* @param {object} body configuration overlay
*
* @returns {Buffer} wire representation
*/
static encodeConfigOverlayMessage(body) {
const overlay = JSON.stringify(body);
const buf = Buffer.alloc(overlay.length + headerSize);
buf.writeUInt8(MessageType.CONFIG_OVERLAY_MESSAGE, 0);
buf.writeUInt8(0, 1);
buf.writeUInt8(TargetType.TARGET_ANY, 2);
buf.write(overlay, headerSize);
return buf;
}
/**
* Creates a wire representation of a metric request message
*
* @param {object} body Metrics request
*
* @returns {Buffer} wire representation
*/
static encodeMetricsRequestMessage(body) {
const request = JSON.stringify(body);
const buf = Buffer.alloc(request.length + headerSize);
buf.writeUInt8(MessageType.METRICS_REQUEST_MESSAGE, 0);
buf.writeUInt8(0, 1);
buf.writeUInt8(TargetType.TARGET_ANY, 2);
buf.write(request, headerSize);
return buf;
}
/** /**
* Protocol name used for subprotocol negociation * Protocol name used for subprotocol negociation
*/ */

View File

@ -82,13 +82,7 @@ function initManagementClient() {
}); });
} }
function isManagementAgentUsed() {
return process.env.MANAGEMENT_USE_AGENT === '1';
}
module.exports = { module.exports = {
managementAgentMessageType, managementAgentMessageType,
initManagementClient, initManagementClient,
isManagementAgentUsed,
}; };

View File

@ -318,33 +318,10 @@ function loadCachedOverlay(log, callback) {
}); });
} }
function applyAndSaveOverlay(overlay, log) {
patchConfiguration(overlay, log, err => {
if (err) {
log.error('could not apply pushed overlay', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
saveConfigurationVersion(null, overlay, log, err => {
if (err) {
log.error('could not cache overlay version', {
error: reshapeExceptionError(err),
method: 'applyAndSaveOverlay',
});
return;
}
log.info('overlay push processed');
});
});
}
module.exports = { module.exports = {
loadCachedOverlay, loadCachedOverlay,
managementDatabaseName, managementDatabaseName,
patchConfiguration, patchConfiguration,
saveConfigurationVersion, saveConfigurationVersion,
remoteOverlayIsNewer, remoteOverlayIsNewer,
applyAndSaveOverlay,
}; };

View File

@ -0,0 +1,29 @@
/* From RFC6455 which defines some reserved status code ranges, the range
* 4000-4999 are for private use. */
const WS_STATUS_IDLE = {
code: 4000,
reason: 'does not reply to ping before timeout',
};
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS =
process.env.MANAGEMENT_CHECK_CLIENT_FQCY_MS || 15000;
const endpointPath = 'api/v1/instance';
const managementEndpointRoot =
process.env.MANAGEMENT_ENDPOINT ||
'https://api.zenko.io';
const managementEndpoint = `${managementEndpointRoot}/${endpointPath}`;
const pushEndpointRoot =
process.env.PUSH_ENDPOINT ||
'https://push.api.zenko.io';
const pushEndpoint = `${pushEndpointRoot}/${endpointPath}`;
module.exports = {
WS_STATUS_IDLE,
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
managementEndpoint,
pushEndpoint,
};

View File

@ -3,6 +3,7 @@ const forge = require('node-forge');
const request = require('request'); const request = require('request');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const { managementEndpoint } = require('./constants');
const managementDatabaseName = 'PENSIEVE'; const managementDatabaseName = 'PENSIEVE';
const tokenConfigurationKey = 'auth/zenko/remote-management-token'; const tokenConfigurationKey = 'auth/zenko/remote-management-token';
@ -25,7 +26,7 @@ function getStoredCredentials(log, callback) {
log, callback); log, callback);
} }
function issueCredentials(managementEndpoint, instanceId, log, callback) { function issueCredentials(instanceId, log, callback) {
log.info('registering with API to get token'); log.info('registering with API to get token');
const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 }); const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 });
@ -54,8 +55,7 @@ function issueCredentials(managementEndpoint, instanceId, log, callback) {
}).json(postData); }).json(postData);
} }
function confirmInstanceCredentials( function confirmInstanceCredentials(instanceId, creds, log, callback) {
managementEndpoint, instanceId, creds, log, callback) {
const opts = { const opts = {
headers: { headers: {
'x-instance-authentication-token': creds.token, 'x-instance-authentication-token': creds.token,
@ -84,7 +84,6 @@ function confirmInstanceCredentials(
* is registered as new against the Orbit API with newly-generated * is registered as new against the Orbit API with newly-generated
* RSA key pair. * RSA key pair.
* *
* @param {string} managementEndpoint API endpoint
* @param {string} instanceId UUID of this deployment * @param {string} instanceId UUID of this deployment
* @param {werelogs~Logger} log Request-scoped logger to be able to trace * @param {werelogs~Logger} log Request-scoped logger to be able to trace
* initialization process * initialization process
@ -92,12 +91,11 @@ function confirmInstanceCredentials(
* *
* @returns {undefined} * @returns {undefined}
*/ */
function initManagementCredentials( function initManagementCredentials(instanceId, log, callback) {
managementEndpoint, instanceId, log, callback) {
getStoredCredentials(log, (error, value) => { getStoredCredentials(log, (error, value) => {
if (error) { if (error) {
if (error.NoSuchKey) { if (error.NoSuchKey) {
return issueCredentials(managementEndpoint, instanceId, log, return issueCredentials(instanceId, log,
(error, value) => { (error, value) => {
if (error) { if (error) {
log.error('could not issue token', log.error('could not issue token',
@ -118,8 +116,7 @@ function initManagementCredentials(
log.info('saved token locally, ' + log.info('saved token locally, ' +
'confirming instance'); 'confirming instance');
return confirmInstanceCredentials( return confirmInstanceCredentials(
managementEndpoint, instanceId, value, log, instanceId, value, log, callback);
callback);
}); });
}); });
} }

View File

@ -1,4 +1,5 @@
const arsenal = require('arsenal'); const arsenal = require('arsenal');
const errors = arsenal.errors;
const async = require('async'); const async = require('async');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
@ -13,19 +14,9 @@ const { initManagementCredentials } = require('./credentials');
const { startWSManagementClient } = require('./push'); const { startWSManagementClient } = require('./push');
const { startPollingManagementClient } = require('./poll'); const { startPollingManagementClient } = require('./poll');
const { reshapeExceptionError } = arsenal.errorUtils; const { reshapeExceptionError } = arsenal.errorUtils;
const { isManagementAgentUsed } = require('./agentClient');
const initRemoteManagementRetryDelay = 10000; const initRemoteManagementRetryDelay = 10000;
const managementEndpointRoot =
process.env.MANAGEMENT_ENDPOINT ||
'https://api.zenko.io';
const managementEndpoint = `${managementEndpointRoot}/api/v1/instance`;
const pushEndpointRoot =
process.env.PUSH_ENDPOINT ||
'https://push.api.zenko.io';
const pushEndpoint = `${pushEndpointRoot}/api/v1/instance`;
function initManagementDatabase(log, callback) { function initManagementDatabase(log, callback) {
// XXX choose proper owner names // XXX choose proper owner names
@ -48,12 +39,12 @@ function initManagementDatabase(log, callback) {
}); });
} }
function startManagementListeners(instanceId, token) { function startManagementListeners(instanceId, newOverlayCallback, token) {
const mode = process.env.MANAGEMENT_MODE || 'push'; const mode = process.env.MANAGEMENT_MODE || 'push';
if (mode === 'push') { if (mode === 'push') {
startWSManagementClient(pushEndpoint, instanceId, token); startWSManagementClient(instanceId, token, newOverlayCallback);
} else { } else {
startPollingManagementClient(managementEndpoint, instanceId, token); startPollingManagementClient(instanceId, token, newOverlayCallback);
} }
} }
@ -66,14 +57,15 @@ function startManagementListeners(instanceId, token) {
* - loading and applying the latest cached overlay configuration * - loading and applying the latest cached overlay configuration
* - starting a configuration update and metrics push background task * - starting a configuration update and metrics push background task
* *
* @param {werelogs~Logger} log Request-scoped logger to be able to trace * @param {werelogs~Logger} log Request-scoped logger to be able to
* initialization process * trace initialization process
* @param {function} callback Function to call once the overlay is loaded * @param {function} newOverlayCallback Function to call once a new overlay
* (overlay) * is received (overlay)
* * @param {function} callback Function to call once the overlay
* is loaded (overlay)
* @returns {undefined} * @returns {undefined}
*/ */
function initManagement(log, callback) { function initManagement(log, newOverlayCallback, callback) {
if ((process.env.REMOTE_MANAGEMENT_DISABLE && if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
process.env.REMOTE_MANAGEMENT_DISABLE !== '0') process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|| process.env.S3BACKEND === 'mem') { || process.env.S3BACKEND === 'mem') {
@ -81,34 +73,28 @@ function initManagement(log, callback) {
return; return;
} }
/* Temporary check before to fully move to the process management agent. */
if (isManagementAgentUsed() ^ typeof callback === 'function') {
let msg = 'misuse of initManagement function: ';
msg += `MANAGEMENT_USE_AGENT: ${process.env.MANAGEMENT_USE_AGENT}`;
msg += `, callback type: ${typeof callback}`;
throw new Error(msg);
}
async.waterfall([ async.waterfall([
// eslint-disable-next-line arrow-body-style cb => metadata.setup(cb),
cb => { return isManagementAgentUsed() ? metadata.setup(cb) : cb(); },
cb => initManagementDatabase(log, cb), cb => initManagementDatabase(log, cb),
cb => metadata.getUUID(log, cb), cb => metadata.getUUID(log, cb),
(instanceId, cb) => initManagementCredentials( (instanceId, cb) => {
managementEndpoint, instanceId, log, cb), const initialID = process.env.INITIAL_INSTANCE_ID;
(instanceId, token, cb) => { if (initialID && initialID !== instanceId) {
if (!isManagementAgentUsed()) { log.error('INITIAL_INSTANCE_ID value is different from ' +
cb(null, instanceId, token, {}); 'instance id saved in metadata', {
return; envVarValue: initialID,
metadataValue: instanceId,
});
return callback(errors.InvalidParameterValue);
} }
return cb(null, instanceId);
},
(instanceId, cb) => initManagementCredentials(instanceId, log, cb),
(instanceId, token, cb) => {
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId, loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
token, overlay)); token, overlay));
}, },
(instanceId, token, overlay, cb) => { (instanceId, token, overlay, cb) => {
if (!isManagementAgentUsed()) {
cb(null, instanceId, token, overlay);
return;
}
patchConfiguration(overlay, log, patchConfiguration(overlay, log,
err => cb(err, instanceId, token, overlay)); err => cb(err, instanceId, token, overlay));
}, },
@ -119,13 +105,16 @@ function initManagement(log, callback) {
method: 'initManagement' }); method: 'initManagement' });
setTimeout(initManagement, setTimeout(initManagement,
initRemoteManagementRetryDelay, initRemoteManagementRetryDelay,
logger.newRequestLogger()); logger.newRequestLogger(),
newOverlayCallback,
callback
);
} else { } else {
log.info(`this deployment's Instance ID is ${instanceId}`); log.info(`this deployment's Instance ID is ${instanceId}`);
log.end('management init done'); log.end('management init done');
startManagementListeners(instanceId, token); startManagementListeners(instanceId, newOverlayCallback, token);
if (callback) { if (callback) {
callback(overlay); callback(null, overlay);
} }
} }
}); });

View File

@ -7,16 +7,17 @@ const logger = require('../utilities/logger');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const { const {
loadCachedOverlay, loadCachedOverlay,
patchConfiguration,
saveConfigurationVersion,
} = require('./configuration'); } = require('./configuration');
const { managementEndpoint } = require('./constants');
const { reshapeExceptionError } = arsenal.errorUtils; const { reshapeExceptionError } = arsenal.errorUtils;
const pushReportDelay = 30000; const pushReportDelay = 30000;
const pullConfigurationOverlayDelay = 60000; const pullConfigurationOverlayDelay = 60000;
function loadRemoteOverlay( let overlayMessageListener = null;
managementEndpoint, instanceId, remoteToken, cachedOverlay, log, cb) { let cachedOverlay = null;
function loadRemoteOverlay(instanceId, remoteToken, log, cb) {
log.debug('loading remote overlay'); log.debug('loading remote overlay');
const opts = { const opts = {
headers: { headers: {
@ -30,29 +31,34 @@ function loadRemoteOverlay(
return cb(error); return cb(error);
} }
if (response.statusCode === 200) { if (response.statusCode === 200) {
return cb(null, cachedOverlay, body); return cb(null, body);
} }
if (response.statusCode === 404) { if (response.statusCode === 404) {
return cb(null, cachedOverlay, {}); return cb(null, {});
} }
return cb(arsenal.errors.AccessForbidden, cachedOverlay, {}); return cb(arsenal.errors.AccessForbidden, {});
}).json(); }).json();
} }
// TODO save only after successful patch // TODO save only after successful patch
function applyConfigurationOverlay( function applyConfigurationOverlay(instanceId, remoteToken, log) {
managementEndpoint, instanceId, remoteToken, log) {
async.waterfall([ async.waterfall([
wcb => loadCachedOverlay(log, wcb), wcb => {
(cachedOverlay, wcb) => patchConfiguration(cachedOverlay, if (cachedOverlay) {
log, wcb), wcb(null, cachedOverlay);
(cachedOverlay, wcb) => } else {
loadRemoteOverlay(managementEndpoint, instanceId, remoteToken, loadCachedOverlay(log, wcb);
cachedOverlay, log, wcb), }
(cachedOverlay, remoteOverlay, wcb) => },
saveConfigurationVersion(cachedOverlay, remoteOverlay, log, wcb), overlay => { cachedOverlay = overlay; },
(remoteOverlay, wcb) => patchConfiguration(remoteOverlay, wcb => loadRemoteOverlay(instanceId, remoteToken,
log, wcb), log, wcb),
remoteOverlay => {
cachedOverlay = remoteOverlay;
if (overlayMessageListener) {
overlayMessageListener(JSON.stringify(cachedOverlay));
}
},
], error => { ], error => {
if (error) { if (error) {
log.error('could not apply managed configuration', log.error('could not apply managed configuration',
@ -60,12 +66,11 @@ function applyConfigurationOverlay(
method: 'applyConfigurationOverlay' }); method: 'applyConfigurationOverlay' });
} }
setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay, setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay,
managementEndpoint, instanceId, remoteToken, instanceId, remoteToken, logger.newRequestLogger());
logger.newRequestLogger());
}); });
} }
function postStats(managementEndpoint, instanceId, remoteToken, next) { function postStats(instanceId, remoteToken, next) {
const toURL = `${managementEndpoint}/${instanceId}/stats`; const toURL = `${managementEndpoint}/${instanceId}/stats`;
const toOptions = { const toOptions = {
headers: { headers: {
@ -99,14 +104,19 @@ function getStats() {
return request(fromURL, fromOptions).json(); return request(fromURL, fromOptions).json();
} }
function pushStats(managementEndpoint, instanceId, remoteToken, next) { function pushStats(instanceId, remoteToken, next) {
if (process.env.PUSH_STATS === 'false') { if (process.env.PUSH_STATS === 'false') {
return; return;
} }
getStats().pipe( getStats()
postStats(managementEndpoint, instanceId, remoteToken, next)); .on('error', err => {
setTimeout(pushStats, pushReportDelay, /* If the management process launches too quick, the cloud server may
managementEndpoint, instanceId, remoteToken); * not be listening yet. */
logger.info('failed to get stats', { err });
})
.pipe(
postStats(instanceId, remoteToken, next));
setTimeout(pushStats, pushReportDelay, instanceId, remoteToken);
} }
/** /**
@ -115,20 +125,23 @@ function pushStats(managementEndpoint, instanceId, remoteToken, next) {
* Periodically polls for configuration updates, and pushes stats at * Periodically polls for configuration updates, and pushes stats at
* a fixed interval. * a fixed interval.
* *
* @param {string} managementEndpoint API endpoint
* @param {string} instanceId UUID of this deployment * @param {string} instanceId UUID of this deployment
* @param {string} remoteToken API authentication token * @param {string} remoteToken API authentication token
* @param {function} newOverlayCallback Function to call once a new overlay is
* loaded (overlay)
* *
* @returns {undefined} * @returns {undefined}
*/ */
function startPollingManagementClient( function startPollingManagementClient(
managementEndpoint, instanceId, remoteToken) { instanceId, remoteToken, newOverlayCallback) {
overlayMessageListener = newOverlayCallback;
metadata.notifyBucketChange(() => { metadata.notifyBucketChange(() => {
pushStats(managementEndpoint, instanceId, remoteToken); pushStats(instanceId, remoteToken);
}); });
pushStats(managementEndpoint, instanceId, remoteToken); pushStats(instanceId, remoteToken);
applyConfigurationOverlay(managementEndpoint, instanceId, remoteToken, applyConfigurationOverlay(instanceId, remoteToken,
logger.newRequestLogger()); logger.newRequestLogger());
} }

View File

@ -4,19 +4,16 @@ const net = require('net');
const request = require('request'); const request = require('request');
const { URL } = require('url'); const { URL } = require('url');
const WebSocket = require('ws'); const WebSocket = require('ws');
const assert = require('assert');
const _config = require('../Config').config; const _config = require('../Config').config;
const logger = require('../utilities/logger'); const logger = require('../utilities/logger');
const metadata = require('../metadata/wrapper'); const metadata = require('../metadata/wrapper');
const { reshapeExceptionError } = arsenal.errorUtils; const { reshapeExceptionError } = arsenal.errorUtils;
const { isManagementAgentUsed } = require('./agentClient');
const { applyAndSaveOverlay } = require('./configuration');
const { const {
ChannelMessageV0, ChannelMessageV0,
MessageType, MessageType,
} = require('./ChannelMessageV0'); } = require('./ChannelMessageV0');
const { pushEndpoint } = require('./constants');
const { const {
CONFIG_OVERLAY_MESSAGE, CONFIG_OVERLAY_MESSAGE,
@ -65,19 +62,23 @@ function createWSAgent(pushEndpoint, env, log) {
return null; return null;
} }
function pushEndpointUrlFromInstanceId(instanceId) {
return `${pushEndpoint}/${instanceId}/ws`;
}
/** /**
* Starts background task that updates configuration and pushes stats. * Starts background task that updates configuration and pushes stats.
* *
* Receives pushed Websocket messages on configuration updates, and * Receives pushed Websocket messages on configuration updates, and
* sends stat messages in response to API sollicitations. * sends stat messages in response to API sollicitations.
* *
* @param {string} pushEndpoint API endpoint * @param {string} instanceId UUID of this deployment
* @param {string} instanceId UUID of this deployment * @param {string} token API authentication token
* @param {string} token API authentication token * @param {function} newOverlayCallback Function to call once a new overlay is
* * loaded (overlay)
* @returns {undefined} * @returns {undefined}
*/ */
function startWSManagementClient(pushEndpoint, instanceId, token) { function startWSManagementClient(instanceId, token, newOverlayCallback) {
logger.info('connecting to push server'); logger.info('connecting to push server');
function _logError(error, errorMessage, method) { function _logError(error, errorMessage, method) {
if (error) { if (error) {
@ -86,14 +87,18 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
} }
} }
overlayMessageListener = newOverlayCallback;
const socketsByChannelId = []; const socketsByChannelId = [];
const headers = { const headers = {
'x-instance-authentication-token': token, 'x-instance-authentication-token': token,
}; };
const agent = createWSAgent(pushEndpoint, process.env, logger); const agent = createWSAgent(pushEndpoint, process.env, logger);
const url = `${pushEndpoint}/${instanceId}/ws`; const url = pushEndpointUrlFromInstanceId(instanceId);
const ws = new WebSocket(url, subprotocols, { headers, agent }); const ws = new WebSocket(url, subprotocols, { headers, agent });
let pingTimeout = null; let pingTimeout = null;
function sendPing() { function sendPing() {
@ -112,7 +117,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
if (process.env.PUSH_STATS === 'false') { if (process.env.PUSH_STATS === 'false') {
return; return;
} }
const fromURL = `http://localhost:${_config.port}/_/report`; const fromURL = process.env.STAT_REPORT_URL ||
`http://localhost:${_config.port}/_/report`;
const fromOptions = { const fromOptions = {
headers: { headers: {
'x-scal-report-token': process.env.REPORT_TOKEN, 'x-scal-report-token': process.env.REPORT_TOKEN,
@ -121,7 +127,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
}; };
request(fromURL, fromOptions, (err, response, body) => { request(fromURL, fromOptions, (err, response, body) => {
if (err) { if (err) {
_logError(err, 'failed to get metrics report', 'pushStats'); const what = 'failed to get metrics report';
const msg =
`${what} at ${fromURL} with headers ${fromOptions}`;
_logError(err, msg, 'pushStats');
return; return;
} }
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body), ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
@ -204,8 +213,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
metadata.notifyBucketChange(null); metadata.notifyBucketChange(null);
_config.removeListener('browser-access-enabled-change', _config.removeListener('browser-access-enabled-change',
browserAccessChangeHandler); browserAccessChangeHandler);
setTimeout(startWSManagementClient, 10000, pushEndpoint, const timeout = process.env.ORBIT_CONNECTION_TIMEOUT_MS || 10000;
instanceId, token);
setTimeout(startWSManagementClient, timeout, instanceId, token,
newOverlayCallback);
}); });
ws.on('error', err => { ws.on('error', err => {
@ -224,17 +235,12 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
}); });
ws.on('message', data => { ws.on('message', data => {
const log = logger.newRequestLogger();
const message = new ChannelMessageV0(data); const message = new ChannelMessageV0(data);
switch (message.getType()) { switch (message.getType()) {
case CONFIG_OVERLAY_MESSAGE: case CONFIG_OVERLAY_MESSAGE:
if (!isManagementAgentUsed()) { if (overlayMessageListener) {
applyAndSaveOverlay(JSON.parse(message.getPayload()), log); overlayMessageListener(message.getPayload().toString());
} else {
if (overlayMessageListener) {
overlayMessageListener(message.getPayload().toString());
}
} }
break; break;
case METRICS_REQUEST_MESSAGE: case METRICS_REQUEST_MESSAGE:
@ -256,13 +262,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
}); });
} }
function addOverlayMessageListener(callback) {
assert(typeof callback === 'function');
overlayMessageListener = callback;
}
module.exports = { module.exports = {
createWSAgent, createWSAgent,
startWSManagementClient, startWSManagementClient,
addOverlayMessageListener, pushEndpointUrlFromInstanceId,
}; };

View File

@ -14,11 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
const api = require('./api/api'); const api = require('./api/api');
const data = require('./data/wrapper'); const data = require('./data/wrapper');
const metadata = require('./metadata/wrapper'); const metadata = require('./metadata/wrapper');
const { initManagement } = require('./management'); const { initManagementClient } = require('./management/agentClient');
const {
initManagementClient,
isManagementAgentUsed,
} = require('./management/agentClient');
const routes = arsenal.s3routes.routes; const routes = arsenal.s3routes.routes;
const websiteEndpoints = _config.websiteEndpoints; const websiteEndpoints = _config.websiteEndpoints;
@ -40,7 +36,6 @@ const STATS_INTERVAL = 5; // 5 seconds
const STATS_EXPIRY = 30; // 30 seconds const STATS_EXPIRY = 30; // 30 seconds
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL, const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
STATS_EXPIRY); STATS_EXPIRY);
const enableRemoteManagement = true;
class S3Server { class S3Server {
/** /**
@ -153,15 +148,7 @@ class S3Server {
// TODO this should wait for metadata healthcheck to be ok // TODO this should wait for metadata healthcheck to be ok
// TODO only do this in cluster master // TODO only do this in cluster master
if (enableRemoteManagement) { initManagementClient();
if (!isManagementAgentUsed()) {
setTimeout(() => {
initManagement(logger.newRequestLogger());
}, 5000);
} else {
initManagementClient();
}
}
} }
/* /*

View File

@ -5,19 +5,20 @@ const logger = require('./lib/utilities/logger');
const { initManagement } = require('./lib/management'); const { initManagement } = require('./lib/management');
const _config = require('./lib/Config').config; const _config = require('./lib/Config').config;
const { managementAgentMessageType } = require('./lib/management/agentClient'); const { managementAgentMessageType } = require('./lib/management/agentClient');
const { addOverlayMessageListener } = require('./lib/management/push');
const { saveConfigurationVersion } = require('./lib/management/configuration'); const { saveConfigurationVersion } = require('./lib/management/configuration');
const {
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
WS_STATUS_IDLE,
} = require('./lib/management/constants');
// TODO: auth? // TODO: auth?
// TODO: werelogs with a specific name. // TODO: werelogs with a specific name.
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
class ManagementAgentServer { class ManagementAgentServer {
constructor() { constructor() {
this.port = _config.managementAgent.port || 8010; this.port = _config.managementAgent.port || 9753;
this.wss = null; this.wss = null;
this.loadedOverlay = null; this.loadedOverlay = null;
@ -29,25 +30,21 @@ class ManagementAgentServer {
process.on('SIGPIPE', () => {}); process.on('SIGPIPE', () => {});
} }
start(_cb) { start() {
const cb = _cb || function noop() {};
/* Define REPORT_TOKEN env variable needed by the management /* Define REPORT_TOKEN env variable needed by the management
* module. */ * module. */
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|| _config.reportToken || _config.reportToken
|| Uuid.v4(); || Uuid.v4();
initManagement(logger.newRequestLogger(), overlay => { /* The initManegement function retries when it fails. */
let error = null; const log = logger.newRequestLogger();
initManagement(log, this.onNewOverlay.bind(this), (err, overlay) => {
if (overlay) { if (err) {
this.loadedOverlay = overlay; process.exit(0);
this.startServer();
} else {
error = new Error('failed to init management');
} }
return cb(error); this.loadedOverlay = overlay;
this.startServer();
}); });
} }
@ -75,8 +72,6 @@ class ManagementAgentServer {
setInterval(this.checkBrokenConnections.bind(this), setInterval(this.checkBrokenConnections.bind(this),
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS); CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
addOverlayMessageListener(this.onNewOverlay.bind(this));
} }
onConnection(socket, request) { onConnection(socket, request) {
@ -154,6 +149,14 @@ class ManagementAgentServer {
return; return;
} }
this.loadedOverlay = remoteOverlayObj; this.loadedOverlay = remoteOverlayObj;
if (this.loadedOverlay.browserAccess) {
if (Boolean(_config.browserAccessEnabled) !==
Boolean(this.loadedOverlay.browserAccess.enabled)) {
_config.browserAccessEnabled =
Boolean(this.loadedOverlay.browserAccess.enabled);
_config.emit('browser-access-enabled-change');
}
}
this.wss.clients.forEach( this.wss.clients.forEach(
this._sendNewOverlayToClient.bind(this) this._sendNewOverlayToClient.bind(this)
); );
@ -166,7 +169,7 @@ class ManagementAgentServer {
logger.info('close broken connection', { logger.info('close broken connection', {
client: client._socket._peername, client: client._socket._peername,
}); });
client.terminate(); client.close(WS_STATUS_IDLE.code, WS_STATUS_IDLE.reason);
return; return;
} }
client.isAlive = false; client.isAlive = false;

View File

@ -84,17 +84,19 @@
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js", "ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util", "ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch", "ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
"ft_management_agent": "cd tests/functional/managementAgent/ && npm test",
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7", "install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
"lint": "eslint $(git ls-files '*.js')", "lint": "eslint $(git ls-files '*.js')",
"lint_md": "mdlint $(git ls-files '*.md')", "lint_md": "mdlint $(git ls-files '*.md')",
"mem_backend": "S3BACKEND=mem node index.js", "mem_backend": "S3BACKEND=mem node index.js",
"perf": "mocha tests/performance/s3standard.js", "perf": "mocha tests/performance/s3standard.js",
"start": "npm-run-all --parallel start_dmd start_s3server", "start": "npm-run-all --parallel start_dmd start_management start_s3server",
"start_mongo": "npm run cloudserver", "start_mongo": "npm run cloudserver",
"start_mdserver": "node mdserver.js", "start_mdserver": "node mdserver.js",
"start_dataserver": "node dataserver.js", "start_dataserver": "node dataserver.js",
"start_s3server": "node index.js", "start_s3server": "node index.js",
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver", "start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
"start_management": "node managementAgent.js",
"start_utapi": "node lib/utapi/utapi.js", "start_utapi": "node lib/utapi/utapi.js",
"utapi_replay": "node lib/utapi/utapiReplay.js", "utapi_replay": "node lib/utapi/utapiReplay.js",
"management_agent": "node managementAgent.js", "management_agent": "node managementAgent.js",

View File

@ -0,0 +1,425 @@
'use strict'; // eslint-disable-line strict
const assert = require('assert');
const EventEmitter = require('events');
const http = require('http');
const net = require('net');
const URL = require('url');
const WebSocket = require('ws');
const _config = require('../../../lib/Config').config;
const {
managementAgentMessageType,
} = require('../../../lib/management/agentClient');
const {
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
WS_STATUS_IDLE,
} = require('../../../lib/management/constants');
const {
OrbitSimulatorServer,
OrbitSimulatorEvent,
} = require('./orbitServerSimulator');
const {
ChannelMessageV0,
MessageType,
} = require('../../../lib/management/ChannelMessageV0');
const PAYLOAD_MSG_TYPE = MessageType.CHANNEL_PAYLOAD_MESSAGE;
const CONFIG_OVERLAY_MSG_TYPE = MessageType.CONFIG_OVERLAY_MESSAGE;
const METRICS_REQ_MSG_TYPE = MessageType.METRICS_REQUEST_MESSAGE;
const METRICS_REPORT_MSG_TYPE = MessageType.METRICS_REPORT_MESSAGE;
const CHAN_CLOSE_MSG_TYPE = MessageType.CHANNEL_CLOSE_MESSAGE;
function testWithBrowserAccessEnabled() {
before(done => {
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
const overlay = { browserAccess: { enabled: false } };
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, overlay);
orbitSimulator.stop(done);
});
orbitSimulator.start();
});
it('should not forward payload message if no overlay has been received',
done => {
const chanId = 1;
const orbitSimulator = new OrbitSimulatorServer();
/* Payload message socket server. The management process forwards
* payload message from Orbit. */
const server = new net.Server();
server.on('connection', () => {
done(new Error('should not connect to payload message server'));
});
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
/* Once the management process is connected to orbit, send it a
* payload message. It is supposed to create the channel ID socket
* with the payload message server. */
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, 'data', chanId);
setTimeout(() => {
server.close(() => {
orbitSimulator.stop(done);
});
}, 4000);
});
orbitSimulator.start();
});
/* XXX: this test set the managementProcess config browserAccessEnabled to
* true, allowing it to forward payload message. This internal state value
* is required for the following tests. */
it('should close a channel ID when requested', done => {
const chanId = 1;
let closeMessageSent = false;
const orbitSimulator = new OrbitSimulatorServer();
/* Payload message socket server. The management process forwards
* payload message from Orbit. */
const server = new net.Server();
server.on('connection', s => {
s.on('close', hadError => {
/* Make sure the connection had been closed after the CLOSE
* message has been sent by Orbit and there is no error. */
assert.strictEqual(hadError, false);
assert.strictEqual(closeMessageSent, true);
server.close(() => { orbitSimulator.stop(done); });
});
s.on('data', () => {
/* Once the payload message has been forwarded from Orbit
* to the socket server by the management process, make
* Orbit request this channel close. */
closeMessageSent = true;
orbitSimulator.sendMessage(CHAN_CLOSE_MSG_TYPE, '', chanId);
});
});
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
/* Once the management process is connected to orbit, send it a
* payload message to create the channel ID with the payload
* message server. */
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
const overlay = { browserAccess: { enabled: true } };
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, overlay);
setTimeout(() => {
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, 'data', chanId);
}, 1000);
});
orbitSimulator.start();
});
/* When the process management receives a payload message associated to
* a specific channel ID from Orbit, it forwards this message to a
* configured host/port socket.
* When it received data on this socket, it forwards it to Orbit. */
it('should forward payload message', done => {
const payload = 'payload';
const chanId = 1;
let socket = null;
/* Payload message socket server. The management process forwards
* payload message from Orbit. */
const server = new net.Server();
server.on('connection', s => {
socket = s;
/* After receiving the payload message sent by Orbit and
* forwarded by the management process, reply on the socket. */
socket.on('data', data => {
assert.strictEqual(data.toString(), payload);
socket.write(payload);
});
});
/* Once the management process is connected to orbit, send it a
* payload message. */
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload, chanId);
});
/* Check orbit received the message. */
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
const message = new ChannelMessageV0(data);
assert.strictEqual(message.getType(), PAYLOAD_MSG_TYPE);
assert.strictEqual(message.payload.toString(), payload);
assert.strictEqual(message.getChannelNumber(), chanId);
socket.end();
server.close(() => { orbitSimulator.stop(done); });
});
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
orbitSimulator.start();
});
it('should use one TCP socket per channel ID', done => {
const payload = 'payload';
const chanId1 = 1;
const chanId2 = 2;
const evtEmitter = new EventEmitter();
const clients = [];
const payloadMsgFwdedEvent = 'payloadMessageForwarded';
let socketServerFirstMsgReceived = false;
let orbitFirstMsgReceived = false;
/* Payload message socket server. The management process forwards
* payload message from Orbit. */
const server = new net.Server();
server.on('connection', s => {
assert.equal(clients.indexOf(s), -1);
clients.push(s);
s.on('data', () => { evtEmitter.emit(payloadMsgFwdedEvent); });
});
/* Once the management process is connected to orbit, send it a
* payload message. */
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload, chanId1);
});
/* When the socket server receives the 1st payload, send an other
* one with a different channel id. In both cases, reply with a
* payload. */
evtEmitter.on(payloadMsgFwdedEvent, () => {
if (!socketServerFirstMsgReceived) {
assert.equal(clients.length, 1);
socketServerFirstMsgReceived = true;
clients[0].write(payload);
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload,
chanId2);
return;
}
assert.equal(clients.length, 2);
clients[1].write(payload);
});
/* Check orbit received the 2 messages. */
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
const message = new ChannelMessageV0(data);
if (!orbitFirstMsgReceived) {
assert.strictEqual(message.getType(), PAYLOAD_MSG_TYPE);
assert.strictEqual(message.payload.toString(), payload);
assert.strictEqual(message.getChannelNumber(), chanId1);
orbitFirstMsgReceived = true;
return;
}
/* Ignore CHANNEL_CLOSE_MESSAGE */
if (message.getType() !== PAYLOAD_MSG_TYPE) {
return;
}
assert.strictEqual(message.payload.toString(), payload);
assert.strictEqual(message.getChannelNumber(), chanId2);
clients.forEach(client => { client.end(); });
server.close(() => { orbitSimulator.stop(done); });
});
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
orbitSimulator.start();
});
}
/* Test the management process.
*
* This process connects to Orbit WebSocket server and is itself a WebSocket
* server. Testing it requires:
* - an orbit simulator,
* - a WebSocket client to connect to it,
* - HTTP server to answer the stats requests,
* - a socket server for payload messages.
**/
describe('Management process', function testSuite() {
this.timeout(120000);
/* Make sure the process management send payload message to a local host
* socket to be able to receive this message in this test. */
assert.strictEqual(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST,
'localhost');
function createWs(path) {
const host = _config.managementAgent.host;
const port = _config.managementAgent.port;
const url = `ws://${host}:${port}/${path || 'watch'}`;
return new WebSocket(url);
}
it('should not listen on others routes than `watch`', done => {
const ws = createWs('wrong_path');
const msg = 'management agent process should not listen this route';
ws.on('open', () => { done(new Error(msg)); });
ws.on('unexpected-response', (_, response) => {
assert.strictEqual(response.statusCode, 400);
done();
});
});
it('should listen on `watch` route', done => {
const ws = createWs();
ws.on('open', done);
ws.on('error', error => { done(error); });
});
it('should terminate the connection when a client does not answer ping',
done => {
this.timeout(2 * CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
const ws = createWs();
ws.on('close', (code, reason) => {
assert.strictEqual(code, WS_STATUS_IDLE.code);
assert.strictEqual(reason, WS_STATUS_IDLE.reason);
done();
});
ws.on('error', error => { done(error); });
ws.on('message', () => {
/* Ugly eventTarget internal fields hacking to avoid this web
* socket to answer to ping messages. It will make
* the management agent to close the connection after a timeout.
* Defining an onPing event does not help, this internal
* function is still called. */
ws._receiver._events.ping = function noop() {};
});
});
it('should connect to orbit', done => {
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
orbitSimulator.stop(done);
});
orbitSimulator.start();
});
it('should save the last overlay and send it to client on connection',
done => {
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
/* Send an overlay to management process. */
const body = 'body';
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, body);
/* Connect to the process manager and check its saved
* overlay. */
const ws = createWs();
ws.on('error', error => { done(new Error(error)); });
ws.on('message', data => {
const msg = JSON.parse(data);
const type = managementAgentMessageType.NEW_OVERLAY;
assert.strictEqual(msg.messageType, type);
assert.strictEqual(msg.payload.toString(), body);
ws.terminate();
orbitSimulator.stop(done);
});
});
orbitSimulator.start();
});
it('should send new overlay to its client', done => {
const body = 'body';
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
/* Connect to management process to receive the first overlay.
**/
const ws = createWs();
let firstMsgReceived = false;
ws.on('error', () => { done(new Error('connection error')); });
ws.on('message', data => {
const msg = JSON.parse(data);
if (!firstMsgReceived) {
firstMsgReceived = true;
/* Send a new overlay to management process. */
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE,
body);
return;
}
/* Check we receive the second overlay. */
assert.strictEqual(msg.payload.toString(), body);
ws.terminate();
orbitSimulator.stop(done);
});
});
orbitSimulator.start();
});
it('should get and send stats to orbit on stat requests', done => {
const stats = { stats: 'stats' };
/* Mock stats server, it returns a JSON object stringified on
* request on the stats path. */
const url = URL.parse(process.env.STAT_REPORT_URL);
const statServer = http.createServer((request, response) => {
if (request.url !== url.pathname) {
response.writeHead(400, { 'Content-type': 'text/plan' });
response.write('bad path');
response.end();
return;
}
response.writeHead(200, { 'Content-type': 'text/plan' });
response.write(JSON.stringify(stats));
response.end();
});
statServer.listen(url.port);
/* Once the management process is connected to orbit, send it a
* stat request. */
const orbitSimulator = new OrbitSimulatorServer();
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
orbitSimulator.sendMessage(METRICS_REQ_MSG_TYPE, 'data');
});
/* And finally check the management process replies by a metrics
* report message. */
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
const message = new ChannelMessageV0(data);
assert.strictEqual(message.getType(), METRICS_REPORT_MSG_TYPE);
assert.deepStrictEqual(JSON.parse(message.payload.toString()),
stats);
statServer.close(() => { orbitSimulator.stop(done); });
});
orbitSimulator.start();
});
/* Test suite which requires the management process to have received an
* overlay with the browserAccessEnabled set to true, otherwise payload
* message are not forwarded. This test suite is separated from the
* other tests because of its internal requirement. As long as there is
* no tests launching/stoping the management process there is no
* better way to test this. Anonymous function not used here to save an
* indentation level. */
describe('Management process with browser access enabled',
testWithBrowserAccessEnabled);
});

View File

@ -0,0 +1,6 @@
{
"tests": {
"files": [ "/test" ],
"on": "aggressor"
}
}

View File

@ -0,0 +1,126 @@
const assert = require('assert');
const { EventEmitter } = require('events');
const url = require('url');
const WebSocket = require('ws');
const logger = require('../../../lib/utilities/logger');
const OrbitSimulatorEvent = {
EVENT_LISTENING: 'listening',
EVENT_CONNECTION: 'connection',
EVENT_MESSAGE: 'message',
};
const {
ChannelMessageV0,
MessageType,
} = require('../../../lib/management/ChannelMessageV0');
const {
pushEndpointUrlFromInstanceId,
} = require('../../../lib/management/push');
class OrbitSimulatorServer extends EventEmitter {
constructor() {
super();
assert(process.env.INITIAL_INSTANCE_ID,
'INITIAL_INSTANCE_ID env variable is required');
const instanceId = process.env.INITIAL_INSTANCE_ID;
const endpointUrl = pushEndpointUrlFromInstanceId(instanceId);
const endpointUrlObj = url.parse(endpointUrl);
assert(endpointUrlObj.host, 'localhost');
this.wss = null;
this.port = endpointUrlObj.port;
this.path = endpointUrlObj.path;
this.stop = this.stop.bind(this);
process.on('SIGINT', this.stop);
process.on('SIGHUP', this.stop);
process.on('SIGQUIT', this.stop);
process.on('SIGTERM', this.stop);
process.on('SIGPIPE', () => {});
}
start() {
this.wss = new WebSocket.Server({
port: this.port,
path: this.path,
clientTracking: true,
});
this.wss.on('connection', clientWs => {
clientWs.on('message', data => {
this.emit(OrbitSimulatorEvent.EVENT_MESSAGE, data);
});
this.emit(OrbitSimulatorEvent.EVENT_CONNECTION);
});
this.wss.on('listening', () => {
this.emit(OrbitSimulatorEvent.EVENT_LISTENING);
});
this.wss.on('error', error => {
logger.error('orbit CI simulator error', { error });
});
}
stop(cb) {
if (!this.wss) {
if (cb) {
cb();
}
return;
}
this.wss.close(() => {
if (cb) {
cb();
}
});
}
sendMessage(messageType, body, channelId) {
let message = null;
switch (messageType) {
case MessageType.CONFIG_OVERLAY_MESSAGE:
assert(body);
message = ChannelMessageV0.encodeConfigOverlayMessage(body);
break;
case MessageType.METRICS_REQUEST_MESSAGE:
assert(body);
message = ChannelMessageV0.encodeMetricsRequestMessage(body);
break;
case MessageType.METRICS_REPORT_MESSAGE:
assert(body);
message = ChannelMessageV0.encodeMetricsReportMessage(body);
break;
case MessageType.CHANNEL_CLOSE_MESSAGE:
assert(channelId);
message = ChannelMessageV0.encodeChannelCloseMessage(channelId);
break;
case MessageType.CHANNEL_PAYLOAD_MESSAGE:
assert(body);
assert(channelId);
message = ChannelMessageV0.encodeChannelDataMessage(
channelId,
Buffer.from(body)
);
break;
default:
logger.error('unsupported message type', { messageType });
return;
}
this.wss.clients.forEach(client => { client.send(message); });
}
}
module.exports = {
OrbitSimulatorServer,
OrbitSimulatorEvent,
};

View File

@ -0,0 +1,6 @@
{
"name": "management-agent-tests",
"scripts": {
"test": "mocha -t 40000 *.js"
}
}