Compare commits
11 Commits
developmen
...
feature/ZE
Author | SHA1 | Date |
---|---|---|
jeremyds | 240d16a280 | |
jeremyds | ce8e01f37c | |
jeremyds | 81181892d5 | |
jeremyds | b3b371d597 | |
jeremyds | 1804d8b10a | |
jeremyds | 84c51bdac8 | |
jeremyds | 9f6c5fac73 | |
jeremyds | 5a578b3ef0 | |
jeremyds | 5dc99e24aa | |
jeremyds | 052628a9d4 | |
jeremyds | 37a4fc30e0 |
|
@ -26,3 +26,4 @@ ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
|
|||
CMD [ "npm", "start" ]
|
||||
|
||||
EXPOSE 8000
|
||||
EXPOSE 9753
|
||||
|
|
45
eve/main.yml
45
eve/main.yml
|
@ -65,6 +65,16 @@ models:
|
|||
S3BACKEND: "mem"
|
||||
MPU_TESTING: "yes"
|
||||
S3METADATA: mongodb
|
||||
- env: &management-vars
|
||||
S3BACKEND: "file"
|
||||
S3METADATA: mongodb
|
||||
PUSH_ENDPOINT: "http://localhost:9989/endpoint"
|
||||
STAT_REPORT_URL: "http://localhost:8081/stat"
|
||||
SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST: "localhost"
|
||||
SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT: "8082"
|
||||
MANAGEMENT_CHECK_CLIENT_FQCY_MS: "2000"
|
||||
ORBIT_CONNECTION_TIMEOUT_MS: "1000"
|
||||
MANAGEMENT_MODE: "push"
|
||||
- env: &multiple-backend-vars
|
||||
S3BACKEND: "mem"
|
||||
S3DATA: "multiple"
|
||||
|
@ -121,6 +131,7 @@ stages:
|
|||
- file-ft-tests
|
||||
- multiple-backend-test
|
||||
- mongo-ft-tests
|
||||
- management-agent-tests
|
||||
waitForFinish: True
|
||||
haltOnFailure: True
|
||||
|
||||
|
@ -165,6 +176,7 @@ stages:
|
|||
vars:
|
||||
aggressorMemLimit: "2Gi"
|
||||
s3MemLimit: "2Gi"
|
||||
managementProcess: disabled
|
||||
env:
|
||||
<<: *multiple-backend-vars
|
||||
<<: *global-env
|
||||
|
@ -209,6 +221,7 @@ stages:
|
|||
aggressorMemLimit: "2Gi"
|
||||
s3MemLimit: "1664Mi"
|
||||
redis: enabled
|
||||
managementProcess: disabled
|
||||
env:
|
||||
<<: *mongo-vars
|
||||
<<: *global-env
|
||||
|
@ -227,6 +240,37 @@ stages:
|
|||
<<: *global-env
|
||||
- Upload: *upload-artifacts
|
||||
|
||||
management-agent-tests:
|
||||
worker: &management-pod
|
||||
type: kube_pod
|
||||
path: eve/workers/pod.yaml
|
||||
images:
|
||||
aggressor: eve/workers/build
|
||||
management_agent: "."
|
||||
vars:
|
||||
aggressorMemLimit: "1Gi"
|
||||
managementProcess: enabled
|
||||
env:
|
||||
<<: *management-vars
|
||||
<<: *global-env
|
||||
|
||||
steps:
|
||||
- Git: *clone
|
||||
- ShellCommand: *credentials
|
||||
- ShellCommand: *npm-install
|
||||
- ShellCommand:
|
||||
command: |
|
||||
bash -c "
|
||||
set -ex
|
||||
bash wait_for_local_port.bash 9753 40
|
||||
source /artifacts/management_test_instance_id
|
||||
npm run ft_management_agent"
|
||||
env:
|
||||
<<: *mongo-vars
|
||||
<<: *management-vars
|
||||
<<: *global-env
|
||||
- Upload: *upload-artifacts
|
||||
|
||||
file-ft-tests:
|
||||
worker:
|
||||
type: kube_pod
|
||||
|
@ -238,6 +282,7 @@ stages:
|
|||
aggressorMemLimit: "1920Mi"
|
||||
s3MemLimit: "2Gi"
|
||||
redis: enabled
|
||||
managementProcess: disabled
|
||||
env:
|
||||
<<: *file-mem-mpu
|
||||
<<: *global-env
|
||||
|
|
|
@ -40,12 +40,13 @@ spec:
|
|||
readOnly: false
|
||||
mountPath: /root/.aws
|
||||
- name: artifacts
|
||||
readOnly: true
|
||||
readOnly: false
|
||||
mountPath: /artifacts
|
||||
command:
|
||||
- bash
|
||||
- -lc
|
||||
- |
|
||||
echo "export INITIAL_INSTANCE_ID=$(od -x /dev/urandom | head -1 | awk '{OFS="-"; print $2$3,$4,$5,$6,$7$8$9}')" > /artifacts/management_test_instance_id
|
||||
buildbot-worker create-worker . $BUILDMASTER:$BUILDMASTER_PORT $WORKERNAME $WORKERPASS
|
||||
buildbot-worker start --nodaemon
|
||||
env:
|
||||
|
@ -61,6 +62,7 @@ spec:
|
|||
- name: {{ key }}
|
||||
value: "{{ value }}"
|
||||
{% endfor %}
|
||||
{% if vars.managementProcess is not defined or vars.managementProcess != "enabled" -%}
|
||||
- name: s3
|
||||
image: {{ images.s3 }}
|
||||
imagePullPolicy: IfNotPresent
|
||||
|
@ -114,6 +116,64 @@ spec:
|
|||
- name: {{ key }}
|
||||
value: "{{ value }}"
|
||||
{% endfor %}
|
||||
{%- endif %}
|
||||
{% if vars.managementProcess is defined and vars.managementProcess == "enabled" -%}
|
||||
- name: management
|
||||
image: {{ images.management_agent }}
|
||||
imagePullPolicy: IfNotPresent
|
||||
resources:
|
||||
requests:
|
||||
cpu: 100m
|
||||
memory: 1Gi
|
||||
limits:
|
||||
cpu: 200m
|
||||
memory: 1Gi
|
||||
volumeMounts:
|
||||
- name: creds
|
||||
readOnly: false
|
||||
mountPath: /root/.aws
|
||||
- name: certs
|
||||
readOnly: true
|
||||
mountPath: /tmp
|
||||
- name: artifacts
|
||||
readOnly: false
|
||||
mountPath: /artifacts
|
||||
command:
|
||||
- bash
|
||||
- -ec
|
||||
- |
|
||||
sleep 10 # wait for mongo
|
||||
source /artifacts/management_test_instance_id
|
||||
/usr/src/app/docker-entrypoint.sh npm run management_agent | tee -a /artifacts/management.log
|
||||
env:
|
||||
{% if vars.env.S3DATA is defined and vars.env.S3DATA == "multiple" -%}
|
||||
- name: S3_LOCATION_FILE
|
||||
value: "/usr/src/app/tests/locationConfig/locationConfigTests.json"
|
||||
{%- endif %}
|
||||
- name: CI
|
||||
value: "true"
|
||||
- name: ENABLE_LOCAL_CACHE
|
||||
value: "true"
|
||||
- name: MONGODB_HOSTS
|
||||
value: "localhost:27018"
|
||||
- name: MONGODB_RS
|
||||
value: "rs0"
|
||||
- name: REDIS_HOST
|
||||
value: "localhost"
|
||||
- name: REDIS_PORT
|
||||
value: "6379"
|
||||
- name: REPORT_TOKEN
|
||||
value: "report-token-1"
|
||||
- name: REMOTE_MANAGEMENT_DISABLE
|
||||
value: "0"
|
||||
- name: HEALTHCHECKS_ALLOWFROM
|
||||
value: "0.0.0.0/0"
|
||||
{% for key, value in vars.env.items() %}
|
||||
- name: {{ key }}
|
||||
value: "{{ value }}"
|
||||
{% endfor %}
|
||||
{%- endif %}
|
||||
|
||||
{% if vars.redis is defined and vars.redis == "enabled" -%}
|
||||
- name: redis
|
||||
image: redis:alpine
|
||||
|
|
|
@ -951,7 +951,7 @@ class Config extends EventEmitter {
|
|||
}
|
||||
|
||||
this.managementAgent = {};
|
||||
this.managementAgent.port = 8010;
|
||||
this.managementAgent.port = 9753;
|
||||
this.managementAgent.host = 'localhost';
|
||||
if (config.managementAgent !== undefined) {
|
||||
if (config.managementAgent.port !== undefined) {
|
||||
|
|
|
@ -100,7 +100,7 @@ class ChannelMessageV0 {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a wire representation of a metrics message
|
||||
* Creates a wire representation of a metrics report message
|
||||
*
|
||||
* @param {object} body Metrics report
|
||||
*
|
||||
|
@ -116,6 +116,40 @@ class ChannelMessageV0 {
|
|||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a wire representation of a configuration overlay message
|
||||
*
|
||||
* @param {object} body configuration overlay
|
||||
*
|
||||
* @returns {Buffer} wire representation
|
||||
*/
|
||||
static encodeConfigOverlayMessage(body) {
|
||||
const overlay = JSON.stringify(body);
|
||||
const buf = Buffer.alloc(overlay.length + headerSize);
|
||||
buf.writeUInt8(MessageType.CONFIG_OVERLAY_MESSAGE, 0);
|
||||
buf.writeUInt8(0, 1);
|
||||
buf.writeUInt8(TargetType.TARGET_ANY, 2);
|
||||
buf.write(overlay, headerSize);
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a wire representation of a metric request message
|
||||
*
|
||||
* @param {object} body Metrics request
|
||||
*
|
||||
* @returns {Buffer} wire representation
|
||||
*/
|
||||
static encodeMetricsRequestMessage(body) {
|
||||
const request = JSON.stringify(body);
|
||||
const buf = Buffer.alloc(request.length + headerSize);
|
||||
buf.writeUInt8(MessageType.METRICS_REQUEST_MESSAGE, 0);
|
||||
buf.writeUInt8(0, 1);
|
||||
buf.writeUInt8(TargetType.TARGET_ANY, 2);
|
||||
buf.write(request, headerSize);
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol name used for subprotocol negociation
|
||||
*/
|
||||
|
|
|
@ -82,13 +82,7 @@ function initManagementClient() {
|
|||
});
|
||||
}
|
||||
|
||||
function isManagementAgentUsed() {
|
||||
return process.env.MANAGEMENT_USE_AGENT === '1';
|
||||
}
|
||||
|
||||
|
||||
module.exports = {
|
||||
managementAgentMessageType,
|
||||
initManagementClient,
|
||||
isManagementAgentUsed,
|
||||
};
|
||||
|
|
|
@ -318,33 +318,10 @@ function loadCachedOverlay(log, callback) {
|
|||
});
|
||||
}
|
||||
|
||||
function applyAndSaveOverlay(overlay, log) {
|
||||
patchConfiguration(overlay, log, err => {
|
||||
if (err) {
|
||||
log.error('could not apply pushed overlay', {
|
||||
error: reshapeExceptionError(err),
|
||||
method: 'applyAndSaveOverlay',
|
||||
});
|
||||
return;
|
||||
}
|
||||
saveConfigurationVersion(null, overlay, log, err => {
|
||||
if (err) {
|
||||
log.error('could not cache overlay version', {
|
||||
error: reshapeExceptionError(err),
|
||||
method: 'applyAndSaveOverlay',
|
||||
});
|
||||
return;
|
||||
}
|
||||
log.info('overlay push processed');
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
loadCachedOverlay,
|
||||
managementDatabaseName,
|
||||
patchConfiguration,
|
||||
saveConfigurationVersion,
|
||||
remoteOverlayIsNewer,
|
||||
applyAndSaveOverlay,
|
||||
};
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/* From RFC6455 which defines some reserved status code ranges, the range
|
||||
* 4000-4999 are for private use. */
|
||||
const WS_STATUS_IDLE = {
|
||||
code: 4000,
|
||||
reason: 'does not reply to ping before timeout',
|
||||
};
|
||||
|
||||
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS =
|
||||
process.env.MANAGEMENT_CHECK_CLIENT_FQCY_MS || 15000;
|
||||
|
||||
const endpointPath = 'api/v1/instance';
|
||||
|
||||
const managementEndpointRoot =
|
||||
process.env.MANAGEMENT_ENDPOINT ||
|
||||
'https://api.zenko.io';
|
||||
const managementEndpoint = `${managementEndpointRoot}/${endpointPath}`;
|
||||
|
||||
const pushEndpointRoot =
|
||||
process.env.PUSH_ENDPOINT ||
|
||||
'https://push.api.zenko.io';
|
||||
const pushEndpoint = `${pushEndpointRoot}/${endpointPath}`;
|
||||
|
||||
|
||||
module.exports = {
|
||||
WS_STATUS_IDLE,
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||
managementEndpoint,
|
||||
pushEndpoint,
|
||||
};
|
|
@ -3,6 +3,7 @@ const forge = require('node-forge');
|
|||
const request = require('request');
|
||||
|
||||
const metadata = require('../metadata/wrapper');
|
||||
const { managementEndpoint } = require('./constants');
|
||||
|
||||
const managementDatabaseName = 'PENSIEVE';
|
||||
const tokenConfigurationKey = 'auth/zenko/remote-management-token';
|
||||
|
@ -25,7 +26,7 @@ function getStoredCredentials(log, callback) {
|
|||
log, callback);
|
||||
}
|
||||
|
||||
function issueCredentials(managementEndpoint, instanceId, log, callback) {
|
||||
function issueCredentials(instanceId, log, callback) {
|
||||
log.info('registering with API to get token');
|
||||
|
||||
const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 });
|
||||
|
@ -54,8 +55,7 @@ function issueCredentials(managementEndpoint, instanceId, log, callback) {
|
|||
}).json(postData);
|
||||
}
|
||||
|
||||
function confirmInstanceCredentials(
|
||||
managementEndpoint, instanceId, creds, log, callback) {
|
||||
function confirmInstanceCredentials(instanceId, creds, log, callback) {
|
||||
const opts = {
|
||||
headers: {
|
||||
'x-instance-authentication-token': creds.token,
|
||||
|
@ -84,7 +84,6 @@ function confirmInstanceCredentials(
|
|||
* is registered as new against the Orbit API with newly-generated
|
||||
* RSA key pair.
|
||||
*
|
||||
* @param {string} managementEndpoint API endpoint
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
||||
* initialization process
|
||||
|
@ -92,12 +91,11 @@ function confirmInstanceCredentials(
|
|||
*
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function initManagementCredentials(
|
||||
managementEndpoint, instanceId, log, callback) {
|
||||
function initManagementCredentials(instanceId, log, callback) {
|
||||
getStoredCredentials(log, (error, value) => {
|
||||
if (error) {
|
||||
if (error.NoSuchKey) {
|
||||
return issueCredentials(managementEndpoint, instanceId, log,
|
||||
return issueCredentials(instanceId, log,
|
||||
(error, value) => {
|
||||
if (error) {
|
||||
log.error('could not issue token',
|
||||
|
@ -118,8 +116,7 @@ function initManagementCredentials(
|
|||
log.info('saved token locally, ' +
|
||||
'confirming instance');
|
||||
return confirmInstanceCredentials(
|
||||
managementEndpoint, instanceId, value, log,
|
||||
callback);
|
||||
instanceId, value, log, callback);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
const arsenal = require('arsenal');
|
||||
const errors = arsenal.errors;
|
||||
const async = require('async');
|
||||
|
||||
const metadata = require('../metadata/wrapper');
|
||||
|
@ -13,19 +14,9 @@ const { initManagementCredentials } = require('./credentials');
|
|||
const { startWSManagementClient } = require('./push');
|
||||
const { startPollingManagementClient } = require('./poll');
|
||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||
const { isManagementAgentUsed } = require('./agentClient');
|
||||
|
||||
const initRemoteManagementRetryDelay = 10000;
|
||||
|
||||
const managementEndpointRoot =
|
||||
process.env.MANAGEMENT_ENDPOINT ||
|
||||
'https://api.zenko.io';
|
||||
const managementEndpoint = `${managementEndpointRoot}/api/v1/instance`;
|
||||
|
||||
const pushEndpointRoot =
|
||||
process.env.PUSH_ENDPOINT ||
|
||||
'https://push.api.zenko.io';
|
||||
const pushEndpoint = `${pushEndpointRoot}/api/v1/instance`;
|
||||
|
||||
function initManagementDatabase(log, callback) {
|
||||
// XXX choose proper owner names
|
||||
|
@ -48,12 +39,12 @@ function initManagementDatabase(log, callback) {
|
|||
});
|
||||
}
|
||||
|
||||
function startManagementListeners(instanceId, token) {
|
||||
function startManagementListeners(instanceId, newOverlayCallback, token) {
|
||||
const mode = process.env.MANAGEMENT_MODE || 'push';
|
||||
if (mode === 'push') {
|
||||
startWSManagementClient(pushEndpoint, instanceId, token);
|
||||
startWSManagementClient(instanceId, token, newOverlayCallback);
|
||||
} else {
|
||||
startPollingManagementClient(managementEndpoint, instanceId, token);
|
||||
startPollingManagementClient(instanceId, token, newOverlayCallback);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -66,14 +57,15 @@ function startManagementListeners(instanceId, token) {
|
|||
* - loading and applying the latest cached overlay configuration
|
||||
* - starting a configuration update and metrics push background task
|
||||
*
|
||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
||||
* initialization process
|
||||
* @param {function} callback Function to call once the overlay is loaded
|
||||
* (overlay)
|
||||
*
|
||||
* @param {werelogs~Logger} log Request-scoped logger to be able to
|
||||
* trace initialization process
|
||||
* @param {function} newOverlayCallback Function to call once a new overlay
|
||||
* is received (overlay)
|
||||
* @param {function} callback Function to call once the overlay
|
||||
* is loaded (overlay)
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function initManagement(log, callback) {
|
||||
function initManagement(log, newOverlayCallback, callback) {
|
||||
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
|
||||
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|
||||
|| process.env.S3BACKEND === 'mem') {
|
||||
|
@ -81,34 +73,28 @@ function initManagement(log, callback) {
|
|||
return;
|
||||
}
|
||||
|
||||
/* Temporary check before to fully move to the process management agent. */
|
||||
if (isManagementAgentUsed() ^ typeof callback === 'function') {
|
||||
let msg = 'misuse of initManagement function: ';
|
||||
msg += `MANAGEMENT_USE_AGENT: ${process.env.MANAGEMENT_USE_AGENT}`;
|
||||
msg += `, callback type: ${typeof callback}`;
|
||||
throw new Error(msg);
|
||||
}
|
||||
|
||||
async.waterfall([
|
||||
// eslint-disable-next-line arrow-body-style
|
||||
cb => { return isManagementAgentUsed() ? metadata.setup(cb) : cb(); },
|
||||
cb => metadata.setup(cb),
|
||||
cb => initManagementDatabase(log, cb),
|
||||
cb => metadata.getUUID(log, cb),
|
||||
(instanceId, cb) => initManagementCredentials(
|
||||
managementEndpoint, instanceId, log, cb),
|
||||
(instanceId, token, cb) => {
|
||||
if (!isManagementAgentUsed()) {
|
||||
cb(null, instanceId, token, {});
|
||||
return;
|
||||
(instanceId, cb) => {
|
||||
const initialID = process.env.INITIAL_INSTANCE_ID;
|
||||
if (initialID && initialID !== instanceId) {
|
||||
log.error('INITIAL_INSTANCE_ID value is different from ' +
|
||||
'instance id saved in metadata', {
|
||||
envVarValue: initialID,
|
||||
metadataValue: instanceId,
|
||||
});
|
||||
return callback(errors.InvalidParameterValue);
|
||||
}
|
||||
return cb(null, instanceId);
|
||||
},
|
||||
(instanceId, cb) => initManagementCredentials(instanceId, log, cb),
|
||||
(instanceId, token, cb) => {
|
||||
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
|
||||
token, overlay));
|
||||
},
|
||||
(instanceId, token, overlay, cb) => {
|
||||
if (!isManagementAgentUsed()) {
|
||||
cb(null, instanceId, token, overlay);
|
||||
return;
|
||||
}
|
||||
patchConfiguration(overlay, log,
|
||||
err => cb(err, instanceId, token, overlay));
|
||||
},
|
||||
|
@ -119,13 +105,16 @@ function initManagement(log, callback) {
|
|||
method: 'initManagement' });
|
||||
setTimeout(initManagement,
|
||||
initRemoteManagementRetryDelay,
|
||||
logger.newRequestLogger());
|
||||
logger.newRequestLogger(),
|
||||
newOverlayCallback,
|
||||
callback
|
||||
);
|
||||
} else {
|
||||
log.info(`this deployment's Instance ID is ${instanceId}`);
|
||||
log.end('management init done');
|
||||
startManagementListeners(instanceId, token);
|
||||
startManagementListeners(instanceId, newOverlayCallback, token);
|
||||
if (callback) {
|
||||
callback(overlay);
|
||||
callback(null, overlay);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
|
|
@ -7,16 +7,17 @@ const logger = require('../utilities/logger');
|
|||
const metadata = require('../metadata/wrapper');
|
||||
const {
|
||||
loadCachedOverlay,
|
||||
patchConfiguration,
|
||||
saveConfigurationVersion,
|
||||
} = require('./configuration');
|
||||
const { managementEndpoint } = require('./constants');
|
||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||
|
||||
const pushReportDelay = 30000;
|
||||
const pullConfigurationOverlayDelay = 60000;
|
||||
|
||||
function loadRemoteOverlay(
|
||||
managementEndpoint, instanceId, remoteToken, cachedOverlay, log, cb) {
|
||||
let overlayMessageListener = null;
|
||||
let cachedOverlay = null;
|
||||
|
||||
function loadRemoteOverlay(instanceId, remoteToken, log, cb) {
|
||||
log.debug('loading remote overlay');
|
||||
const opts = {
|
||||
headers: {
|
||||
|
@ -30,29 +31,34 @@ function loadRemoteOverlay(
|
|||
return cb(error);
|
||||
}
|
||||
if (response.statusCode === 200) {
|
||||
return cb(null, cachedOverlay, body);
|
||||
return cb(null, body);
|
||||
}
|
||||
if (response.statusCode === 404) {
|
||||
return cb(null, cachedOverlay, {});
|
||||
return cb(null, {});
|
||||
}
|
||||
return cb(arsenal.errors.AccessForbidden, cachedOverlay, {});
|
||||
return cb(arsenal.errors.AccessForbidden, {});
|
||||
}).json();
|
||||
}
|
||||
|
||||
// TODO save only after successful patch
|
||||
function applyConfigurationOverlay(
|
||||
managementEndpoint, instanceId, remoteToken, log) {
|
||||
function applyConfigurationOverlay(instanceId, remoteToken, log) {
|
||||
async.waterfall([
|
||||
wcb => loadCachedOverlay(log, wcb),
|
||||
(cachedOverlay, wcb) => patchConfiguration(cachedOverlay,
|
||||
log, wcb),
|
||||
(cachedOverlay, wcb) =>
|
||||
loadRemoteOverlay(managementEndpoint, instanceId, remoteToken,
|
||||
cachedOverlay, log, wcb),
|
||||
(cachedOverlay, remoteOverlay, wcb) =>
|
||||
saveConfigurationVersion(cachedOverlay, remoteOverlay, log, wcb),
|
||||
(remoteOverlay, wcb) => patchConfiguration(remoteOverlay,
|
||||
log, wcb),
|
||||
wcb => {
|
||||
if (cachedOverlay) {
|
||||
wcb(null, cachedOverlay);
|
||||
} else {
|
||||
loadCachedOverlay(log, wcb);
|
||||
}
|
||||
},
|
||||
overlay => { cachedOverlay = overlay; },
|
||||
wcb => loadRemoteOverlay(instanceId, remoteToken,
|
||||
log, wcb),
|
||||
remoteOverlay => {
|
||||
cachedOverlay = remoteOverlay;
|
||||
if (overlayMessageListener) {
|
||||
overlayMessageListener(JSON.stringify(cachedOverlay));
|
||||
}
|
||||
},
|
||||
], error => {
|
||||
if (error) {
|
||||
log.error('could not apply managed configuration',
|
||||
|
@ -60,12 +66,11 @@ function applyConfigurationOverlay(
|
|||
method: 'applyConfigurationOverlay' });
|
||||
}
|
||||
setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay,
|
||||
managementEndpoint, instanceId, remoteToken,
|
||||
logger.newRequestLogger());
|
||||
instanceId, remoteToken, logger.newRequestLogger());
|
||||
});
|
||||
}
|
||||
|
||||
function postStats(managementEndpoint, instanceId, remoteToken, next) {
|
||||
function postStats(instanceId, remoteToken, next) {
|
||||
const toURL = `${managementEndpoint}/${instanceId}/stats`;
|
||||
const toOptions = {
|
||||
headers: {
|
||||
|
@ -99,14 +104,19 @@ function getStats() {
|
|||
return request(fromURL, fromOptions).json();
|
||||
}
|
||||
|
||||
function pushStats(managementEndpoint, instanceId, remoteToken, next) {
|
||||
function pushStats(instanceId, remoteToken, next) {
|
||||
if (process.env.PUSH_STATS === 'false') {
|
||||
return;
|
||||
}
|
||||
getStats().pipe(
|
||||
postStats(managementEndpoint, instanceId, remoteToken, next));
|
||||
setTimeout(pushStats, pushReportDelay,
|
||||
managementEndpoint, instanceId, remoteToken);
|
||||
getStats()
|
||||
.on('error', err => {
|
||||
/* If the management process launches too quick, the cloud server may
|
||||
* not be listening yet. */
|
||||
logger.info('failed to get stats', { err });
|
||||
})
|
||||
.pipe(
|
||||
postStats(instanceId, remoteToken, next));
|
||||
setTimeout(pushStats, pushReportDelay, instanceId, remoteToken);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -115,20 +125,23 @@ function pushStats(managementEndpoint, instanceId, remoteToken, next) {
|
|||
* Periodically polls for configuration updates, and pushes stats at
|
||||
* a fixed interval.
|
||||
*
|
||||
* @param {string} managementEndpoint API endpoint
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {string} remoteToken API authentication token
|
||||
* @param {function} newOverlayCallback Function to call once a new overlay is
|
||||
* loaded (overlay)
|
||||
*
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function startPollingManagementClient(
|
||||
managementEndpoint, instanceId, remoteToken) {
|
||||
instanceId, remoteToken, newOverlayCallback) {
|
||||
overlayMessageListener = newOverlayCallback;
|
||||
|
||||
metadata.notifyBucketChange(() => {
|
||||
pushStats(managementEndpoint, instanceId, remoteToken);
|
||||
pushStats(instanceId, remoteToken);
|
||||
});
|
||||
|
||||
pushStats(managementEndpoint, instanceId, remoteToken);
|
||||
applyConfigurationOverlay(managementEndpoint, instanceId, remoteToken,
|
||||
pushStats(instanceId, remoteToken);
|
||||
applyConfigurationOverlay(instanceId, remoteToken,
|
||||
logger.newRequestLogger());
|
||||
}
|
||||
|
||||
|
|
|
@ -4,19 +4,16 @@ const net = require('net');
|
|||
const request = require('request');
|
||||
const { URL } = require('url');
|
||||
const WebSocket = require('ws');
|
||||
const assert = require('assert');
|
||||
|
||||
const _config = require('../Config').config;
|
||||
const logger = require('../utilities/logger');
|
||||
const metadata = require('../metadata/wrapper');
|
||||
|
||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||
const { isManagementAgentUsed } = require('./agentClient');
|
||||
const { applyAndSaveOverlay } = require('./configuration');
|
||||
const {
|
||||
ChannelMessageV0,
|
||||
MessageType,
|
||||
} = require('./ChannelMessageV0');
|
||||
const { pushEndpoint } = require('./constants');
|
||||
|
||||
const {
|
||||
CONFIG_OVERLAY_MESSAGE,
|
||||
|
@ -65,19 +62,23 @@ function createWSAgent(pushEndpoint, env, log) {
|
|||
return null;
|
||||
}
|
||||
|
||||
function pushEndpointUrlFromInstanceId(instanceId) {
|
||||
return `${pushEndpoint}/${instanceId}/ws`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts background task that updates configuration and pushes stats.
|
||||
*
|
||||
* Receives pushed Websocket messages on configuration updates, and
|
||||
* sends stat messages in response to API sollicitations.
|
||||
*
|
||||
* @param {string} pushEndpoint API endpoint
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {string} token API authentication token
|
||||
*
|
||||
* @param {string} instanceId UUID of this deployment
|
||||
* @param {string} token API authentication token
|
||||
* @param {function} newOverlayCallback Function to call once a new overlay is
|
||||
* loaded (overlay)
|
||||
* @returns {undefined}
|
||||
*/
|
||||
function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||
function startWSManagementClient(instanceId, token, newOverlayCallback) {
|
||||
logger.info('connecting to push server');
|
||||
function _logError(error, errorMessage, method) {
|
||||
if (error) {
|
||||
|
@ -86,14 +87,18 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
}
|
||||
}
|
||||
|
||||
overlayMessageListener = newOverlayCallback;
|
||||
|
||||
const socketsByChannelId = [];
|
||||
const headers = {
|
||||
'x-instance-authentication-token': token,
|
||||
};
|
||||
const agent = createWSAgent(pushEndpoint, process.env, logger);
|
||||
|
||||
const url = `${pushEndpoint}/${instanceId}/ws`;
|
||||
const url = pushEndpointUrlFromInstanceId(instanceId);
|
||||
|
||||
const ws = new WebSocket(url, subprotocols, { headers, agent });
|
||||
|
||||
let pingTimeout = null;
|
||||
|
||||
function sendPing() {
|
||||
|
@ -112,7 +117,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
if (process.env.PUSH_STATS === 'false') {
|
||||
return;
|
||||
}
|
||||
const fromURL = `http://localhost:${_config.port}/_/report`;
|
||||
const fromURL = process.env.STAT_REPORT_URL ||
|
||||
`http://localhost:${_config.port}/_/report`;
|
||||
const fromOptions = {
|
||||
headers: {
|
||||
'x-scal-report-token': process.env.REPORT_TOKEN,
|
||||
|
@ -121,7 +127,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
};
|
||||
request(fromURL, fromOptions, (err, response, body) => {
|
||||
if (err) {
|
||||
_logError(err, 'failed to get metrics report', 'pushStats');
|
||||
const what = 'failed to get metrics report';
|
||||
const msg =
|
||||
`${what} at ${fromURL} with headers ${fromOptions}`;
|
||||
_logError(err, msg, 'pushStats');
|
||||
return;
|
||||
}
|
||||
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
|
||||
|
@ -204,8 +213,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
metadata.notifyBucketChange(null);
|
||||
_config.removeListener('browser-access-enabled-change',
|
||||
browserAccessChangeHandler);
|
||||
setTimeout(startWSManagementClient, 10000, pushEndpoint,
|
||||
instanceId, token);
|
||||
const timeout = process.env.ORBIT_CONNECTION_TIMEOUT_MS || 10000;
|
||||
|
||||
setTimeout(startWSManagementClient, timeout, instanceId, token,
|
||||
newOverlayCallback);
|
||||
});
|
||||
|
||||
ws.on('error', err => {
|
||||
|
@ -224,17 +235,12 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
});
|
||||
|
||||
ws.on('message', data => {
|
||||
const log = logger.newRequestLogger();
|
||||
const message = new ChannelMessageV0(data);
|
||||
|
||||
switch (message.getType()) {
|
||||
case CONFIG_OVERLAY_MESSAGE:
|
||||
if (!isManagementAgentUsed()) {
|
||||
applyAndSaveOverlay(JSON.parse(message.getPayload()), log);
|
||||
} else {
|
||||
if (overlayMessageListener) {
|
||||
overlayMessageListener(message.getPayload().toString());
|
||||
}
|
||||
if (overlayMessageListener) {
|
||||
overlayMessageListener(message.getPayload().toString());
|
||||
}
|
||||
break;
|
||||
case METRICS_REQUEST_MESSAGE:
|
||||
|
@ -256,13 +262,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
|||
});
|
||||
}
|
||||
|
||||
function addOverlayMessageListener(callback) {
|
||||
assert(typeof callback === 'function');
|
||||
overlayMessageListener = callback;
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
createWSAgent,
|
||||
startWSManagementClient,
|
||||
addOverlayMessageListener,
|
||||
pushEndpointUrlFromInstanceId,
|
||||
};
|
||||
|
|
|
@ -14,11 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
|
|||
const api = require('./api/api');
|
||||
const data = require('./data/wrapper');
|
||||
const metadata = require('./metadata/wrapper');
|
||||
const { initManagement } = require('./management');
|
||||
const {
|
||||
initManagementClient,
|
||||
isManagementAgentUsed,
|
||||
} = require('./management/agentClient');
|
||||
const { initManagementClient } = require('./management/agentClient');
|
||||
|
||||
const routes = arsenal.s3routes.routes;
|
||||
const websiteEndpoints = _config.websiteEndpoints;
|
||||
|
@ -40,7 +36,6 @@ const STATS_INTERVAL = 5; // 5 seconds
|
|||
const STATS_EXPIRY = 30; // 30 seconds
|
||||
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
|
||||
STATS_EXPIRY);
|
||||
const enableRemoteManagement = true;
|
||||
|
||||
class S3Server {
|
||||
/**
|
||||
|
@ -153,15 +148,7 @@ class S3Server {
|
|||
|
||||
// TODO this should wait for metadata healthcheck to be ok
|
||||
// TODO only do this in cluster master
|
||||
if (enableRemoteManagement) {
|
||||
if (!isManagementAgentUsed()) {
|
||||
setTimeout(() => {
|
||||
initManagement(logger.newRequestLogger());
|
||||
}, 5000);
|
||||
} else {
|
||||
initManagementClient();
|
||||
}
|
||||
}
|
||||
initManagementClient();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -5,19 +5,20 @@ const logger = require('./lib/utilities/logger');
|
|||
const { initManagement } = require('./lib/management');
|
||||
const _config = require('./lib/Config').config;
|
||||
const { managementAgentMessageType } = require('./lib/management/agentClient');
|
||||
const { addOverlayMessageListener } = require('./lib/management/push');
|
||||
const { saveConfigurationVersion } = require('./lib/management/configuration');
|
||||
const {
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||
WS_STATUS_IDLE,
|
||||
} = require('./lib/management/constants');
|
||||
|
||||
|
||||
// TODO: auth?
|
||||
// TODO: werelogs with a specific name.
|
||||
|
||||
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
|
||||
|
||||
|
||||
class ManagementAgentServer {
|
||||
constructor() {
|
||||
this.port = _config.managementAgent.port || 8010;
|
||||
this.port = _config.managementAgent.port || 9753;
|
||||
this.wss = null;
|
||||
this.loadedOverlay = null;
|
||||
|
||||
|
@ -29,25 +30,21 @@ class ManagementAgentServer {
|
|||
process.on('SIGPIPE', () => {});
|
||||
}
|
||||
|
||||
start(_cb) {
|
||||
const cb = _cb || function noop() {};
|
||||
|
||||
start() {
|
||||
/* Define REPORT_TOKEN env variable needed by the management
|
||||
* module. */
|
||||
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|
||||
|| _config.reportToken
|
||||
|| Uuid.v4();
|
||||
|
||||
initManagement(logger.newRequestLogger(), overlay => {
|
||||
let error = null;
|
||||
|
||||
if (overlay) {
|
||||
this.loadedOverlay = overlay;
|
||||
this.startServer();
|
||||
} else {
|
||||
error = new Error('failed to init management');
|
||||
/* The initManegement function retries when it fails. */
|
||||
const log = logger.newRequestLogger();
|
||||
initManagement(log, this.onNewOverlay.bind(this), (err, overlay) => {
|
||||
if (err) {
|
||||
process.exit(0);
|
||||
}
|
||||
return cb(error);
|
||||
this.loadedOverlay = overlay;
|
||||
this.startServer();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -75,8 +72,6 @@ class ManagementAgentServer {
|
|||
|
||||
setInterval(this.checkBrokenConnections.bind(this),
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||
|
||||
addOverlayMessageListener(this.onNewOverlay.bind(this));
|
||||
}
|
||||
|
||||
onConnection(socket, request) {
|
||||
|
@ -154,6 +149,14 @@ class ManagementAgentServer {
|
|||
return;
|
||||
}
|
||||
this.loadedOverlay = remoteOverlayObj;
|
||||
if (this.loadedOverlay.browserAccess) {
|
||||
if (Boolean(_config.browserAccessEnabled) !==
|
||||
Boolean(this.loadedOverlay.browserAccess.enabled)) {
|
||||
_config.browserAccessEnabled =
|
||||
Boolean(this.loadedOverlay.browserAccess.enabled);
|
||||
_config.emit('browser-access-enabled-change');
|
||||
}
|
||||
}
|
||||
this.wss.clients.forEach(
|
||||
this._sendNewOverlayToClient.bind(this)
|
||||
);
|
||||
|
@ -166,7 +169,7 @@ class ManagementAgentServer {
|
|||
logger.info('close broken connection', {
|
||||
client: client._socket._peername,
|
||||
});
|
||||
client.terminate();
|
||||
client.close(WS_STATUS_IDLE.code, WS_STATUS_IDLE.reason);
|
||||
return;
|
||||
}
|
||||
client.isAlive = false;
|
||||
|
|
|
@ -84,17 +84,19 @@
|
|||
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
|
||||
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
|
||||
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
|
||||
"ft_management_agent": "cd tests/functional/managementAgent/ && npm test",
|
||||
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
|
||||
"lint": "eslint $(git ls-files '*.js')",
|
||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||
"mem_backend": "S3BACKEND=mem node index.js",
|
||||
"perf": "mocha tests/performance/s3standard.js",
|
||||
"start": "npm-run-all --parallel start_dmd start_s3server",
|
||||
"start": "npm-run-all --parallel start_dmd start_management start_s3server",
|
||||
"start_mongo": "npm run cloudserver",
|
||||
"start_mdserver": "node mdserver.js",
|
||||
"start_dataserver": "node dataserver.js",
|
||||
"start_s3server": "node index.js",
|
||||
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
||||
"start_management": "node managementAgent.js",
|
||||
"start_utapi": "node lib/utapi/utapi.js",
|
||||
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
||||
"management_agent": "node managementAgent.js",
|
||||
|
|
|
@ -0,0 +1,425 @@
|
|||
'use strict'; // eslint-disable-line strict
|
||||
const assert = require('assert');
|
||||
const EventEmitter = require('events');
|
||||
const http = require('http');
|
||||
const net = require('net');
|
||||
const URL = require('url');
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const _config = require('../../../lib/Config').config;
|
||||
const {
|
||||
managementAgentMessageType,
|
||||
} = require('../../../lib/management/agentClient');
|
||||
const {
|
||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||
WS_STATUS_IDLE,
|
||||
} = require('../../../lib/management/constants');
|
||||
const {
|
||||
OrbitSimulatorServer,
|
||||
OrbitSimulatorEvent,
|
||||
} = require('./orbitServerSimulator');
|
||||
const {
|
||||
ChannelMessageV0,
|
||||
MessageType,
|
||||
} = require('../../../lib/management/ChannelMessageV0');
|
||||
|
||||
|
||||
const PAYLOAD_MSG_TYPE = MessageType.CHANNEL_PAYLOAD_MESSAGE;
|
||||
const CONFIG_OVERLAY_MSG_TYPE = MessageType.CONFIG_OVERLAY_MESSAGE;
|
||||
const METRICS_REQ_MSG_TYPE = MessageType.METRICS_REQUEST_MESSAGE;
|
||||
const METRICS_REPORT_MSG_TYPE = MessageType.METRICS_REPORT_MESSAGE;
|
||||
const CHAN_CLOSE_MSG_TYPE = MessageType.CHANNEL_CLOSE_MESSAGE;
|
||||
|
||||
|
||||
function testWithBrowserAccessEnabled() {
|
||||
before(done => {
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
const overlay = { browserAccess: { enabled: false } };
|
||||
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, overlay);
|
||||
orbitSimulator.stop(done);
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
it('should not forward payload message if no overlay has been received',
|
||||
done => {
|
||||
const chanId = 1;
|
||||
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
|
||||
/* Payload message socket server. The management process forwards
|
||||
* payload message from Orbit. */
|
||||
const server = new net.Server();
|
||||
server.on('connection', () => {
|
||||
done(new Error('should not connect to payload message server'));
|
||||
});
|
||||
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||
|
||||
/* Once the management process is connected to orbit, send it a
|
||||
* payload message. It is supposed to create the channel ID socket
|
||||
* with the payload message server. */
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, 'data', chanId);
|
||||
setTimeout(() => {
|
||||
server.close(() => {
|
||||
orbitSimulator.stop(done);
|
||||
});
|
||||
}, 4000);
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
/* XXX: this test set the managementProcess config browserAccessEnabled to
|
||||
* true, allowing it to forward payload message. This internal state value
|
||||
* is required for the following tests. */
|
||||
it('should close a channel ID when requested', done => {
|
||||
const chanId = 1;
|
||||
let closeMessageSent = false;
|
||||
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
|
||||
/* Payload message socket server. The management process forwards
|
||||
* payload message from Orbit. */
|
||||
const server = new net.Server();
|
||||
server.on('connection', s => {
|
||||
s.on('close', hadError => {
|
||||
/* Make sure the connection had been closed after the CLOSE
|
||||
* message has been sent by Orbit and there is no error. */
|
||||
assert.strictEqual(hadError, false);
|
||||
assert.strictEqual(closeMessageSent, true);
|
||||
|
||||
server.close(() => { orbitSimulator.stop(done); });
|
||||
});
|
||||
s.on('data', () => {
|
||||
/* Once the payload message has been forwarded from Orbit
|
||||
* to the socket server by the management process, make
|
||||
* Orbit request this channel close. */
|
||||
closeMessageSent = true;
|
||||
orbitSimulator.sendMessage(CHAN_CLOSE_MSG_TYPE, '', chanId);
|
||||
});
|
||||
});
|
||||
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||
|
||||
/* Once the management process is connected to orbit, send it a
|
||||
* payload message to create the channel ID with the payload
|
||||
* message server. */
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
const overlay = { browserAccess: { enabled: true } };
|
||||
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, overlay);
|
||||
setTimeout(() => {
|
||||
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, 'data', chanId);
|
||||
}, 1000);
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
/* When the process management receives a payload message associated to
|
||||
* a specific channel ID from Orbit, it forwards this message to a
|
||||
* configured host/port socket.
|
||||
* When it received data on this socket, it forwards it to Orbit. */
|
||||
it('should forward payload message', done => {
|
||||
const payload = 'payload';
|
||||
const chanId = 1;
|
||||
let socket = null;
|
||||
|
||||
/* Payload message socket server. The management process forwards
|
||||
* payload message from Orbit. */
|
||||
const server = new net.Server();
|
||||
server.on('connection', s => {
|
||||
socket = s;
|
||||
|
||||
/* After receiving the payload message sent by Orbit and
|
||||
* forwarded by the management process, reply on the socket. */
|
||||
socket.on('data', data => {
|
||||
assert.strictEqual(data.toString(), payload);
|
||||
|
||||
socket.write(payload);
|
||||
});
|
||||
});
|
||||
|
||||
/* Once the management process is connected to orbit, send it a
|
||||
* payload message. */
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload, chanId);
|
||||
});
|
||||
|
||||
/* Check orbit received the message. */
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
|
||||
const message = new ChannelMessageV0(data);
|
||||
|
||||
assert.strictEqual(message.getType(), PAYLOAD_MSG_TYPE);
|
||||
assert.strictEqual(message.payload.toString(), payload);
|
||||
assert.strictEqual(message.getChannelNumber(), chanId);
|
||||
|
||||
socket.end();
|
||||
server.close(() => { orbitSimulator.stop(done); });
|
||||
});
|
||||
|
||||
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
it('should use one TCP socket per channel ID', done => {
|
||||
const payload = 'payload';
|
||||
const chanId1 = 1;
|
||||
const chanId2 = 2;
|
||||
const evtEmitter = new EventEmitter();
|
||||
const clients = [];
|
||||
const payloadMsgFwdedEvent = 'payloadMessageForwarded';
|
||||
let socketServerFirstMsgReceived = false;
|
||||
let orbitFirstMsgReceived = false;
|
||||
|
||||
/* Payload message socket server. The management process forwards
|
||||
* payload message from Orbit. */
|
||||
const server = new net.Server();
|
||||
server.on('connection', s => {
|
||||
assert.equal(clients.indexOf(s), -1);
|
||||
|
||||
clients.push(s);
|
||||
s.on('data', () => { evtEmitter.emit(payloadMsgFwdedEvent); });
|
||||
});
|
||||
|
||||
/* Once the management process is connected to orbit, send it a
|
||||
* payload message. */
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload, chanId1);
|
||||
});
|
||||
|
||||
/* When the socket server receives the 1st payload, send an other
|
||||
* one with a different channel id. In both cases, reply with a
|
||||
* payload. */
|
||||
evtEmitter.on(payloadMsgFwdedEvent, () => {
|
||||
if (!socketServerFirstMsgReceived) {
|
||||
assert.equal(clients.length, 1);
|
||||
|
||||
socketServerFirstMsgReceived = true;
|
||||
clients[0].write(payload);
|
||||
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload,
|
||||
chanId2);
|
||||
return;
|
||||
}
|
||||
|
||||
assert.equal(clients.length, 2);
|
||||
clients[1].write(payload);
|
||||
});
|
||||
|
||||
/* Check orbit received the 2 messages. */
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
|
||||
const message = new ChannelMessageV0(data);
|
||||
|
||||
if (!orbitFirstMsgReceived) {
|
||||
assert.strictEqual(message.getType(), PAYLOAD_MSG_TYPE);
|
||||
assert.strictEqual(message.payload.toString(), payload);
|
||||
assert.strictEqual(message.getChannelNumber(), chanId1);
|
||||
|
||||
orbitFirstMsgReceived = true;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Ignore CHANNEL_CLOSE_MESSAGE */
|
||||
if (message.getType() !== PAYLOAD_MSG_TYPE) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert.strictEqual(message.payload.toString(), payload);
|
||||
assert.strictEqual(message.getChannelNumber(), chanId2);
|
||||
|
||||
clients.forEach(client => { client.end(); });
|
||||
server.close(() => { orbitSimulator.stop(done); });
|
||||
});
|
||||
|
||||
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||
orbitSimulator.start();
|
||||
});
|
||||
}
|
||||
|
||||
/* Test the management process.
|
||||
*
|
||||
* This process connects to Orbit WebSocket server and is itself a WebSocket
|
||||
* server. Testing it requires:
|
||||
* - an orbit simulator,
|
||||
* - a WebSocket client to connect to it,
|
||||
* - HTTP server to answer the stats requests,
|
||||
* - a socket server for payload messages.
|
||||
**/
|
||||
describe('Management process', function testSuite() {
|
||||
this.timeout(120000);
|
||||
|
||||
/* Make sure the process management send payload message to a local host
|
||||
* socket to be able to receive this message in this test. */
|
||||
assert.strictEqual(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST,
|
||||
'localhost');
|
||||
|
||||
function createWs(path) {
|
||||
const host = _config.managementAgent.host;
|
||||
const port = _config.managementAgent.port;
|
||||
const url = `ws://${host}:${port}/${path || 'watch'}`;
|
||||
return new WebSocket(url);
|
||||
}
|
||||
|
||||
it('should not listen on others routes than `watch`', done => {
|
||||
const ws = createWs('wrong_path');
|
||||
const msg = 'management agent process should not listen this route';
|
||||
|
||||
ws.on('open', () => { done(new Error(msg)); });
|
||||
ws.on('unexpected-response', (_, response) => {
|
||||
assert.strictEqual(response.statusCode, 400);
|
||||
|
||||
done();
|
||||
});
|
||||
});
|
||||
|
||||
it('should listen on `watch` route', done => {
|
||||
const ws = createWs();
|
||||
|
||||
ws.on('open', done);
|
||||
ws.on('error', error => { done(error); });
|
||||
});
|
||||
|
||||
it('should terminate the connection when a client does not answer ping',
|
||||
done => {
|
||||
this.timeout(2 * CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||
|
||||
const ws = createWs();
|
||||
|
||||
ws.on('close', (code, reason) => {
|
||||
assert.strictEqual(code, WS_STATUS_IDLE.code);
|
||||
assert.strictEqual(reason, WS_STATUS_IDLE.reason);
|
||||
|
||||
done();
|
||||
});
|
||||
|
||||
ws.on('error', error => { done(error); });
|
||||
|
||||
ws.on('message', () => {
|
||||
/* Ugly eventTarget internal fields hacking to avoid this web
|
||||
* socket to answer to ping messages. It will make
|
||||
* the management agent to close the connection after a timeout.
|
||||
* Defining an onPing event does not help, this internal
|
||||
* function is still called. */
|
||||
ws._receiver._events.ping = function noop() {};
|
||||
});
|
||||
});
|
||||
|
||||
it('should connect to orbit', done => {
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
orbitSimulator.stop(done);
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
it('should save the last overlay and send it to client on connection',
|
||||
done => {
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
/* Send an overlay to management process. */
|
||||
const body = 'body';
|
||||
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, body);
|
||||
|
||||
/* Connect to the process manager and check its saved
|
||||
* overlay. */
|
||||
const ws = createWs();
|
||||
ws.on('error', error => { done(new Error(error)); });
|
||||
ws.on('message', data => {
|
||||
const msg = JSON.parse(data);
|
||||
const type = managementAgentMessageType.NEW_OVERLAY;
|
||||
|
||||
assert.strictEqual(msg.messageType, type);
|
||||
assert.strictEqual(msg.payload.toString(), body);
|
||||
|
||||
ws.terminate();
|
||||
orbitSimulator.stop(done);
|
||||
});
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
it('should send new overlay to its client', done => {
|
||||
const body = 'body';
|
||||
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
/* Connect to management process to receive the first overlay.
|
||||
**/
|
||||
const ws = createWs();
|
||||
let firstMsgReceived = false;
|
||||
|
||||
ws.on('error', () => { done(new Error('connection error')); });
|
||||
ws.on('message', data => {
|
||||
const msg = JSON.parse(data);
|
||||
|
||||
if (!firstMsgReceived) {
|
||||
firstMsgReceived = true;
|
||||
/* Send a new overlay to management process. */
|
||||
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE,
|
||||
body);
|
||||
return;
|
||||
}
|
||||
|
||||
/* Check we receive the second overlay. */
|
||||
assert.strictEqual(msg.payload.toString(), body);
|
||||
|
||||
ws.terminate();
|
||||
orbitSimulator.stop(done);
|
||||
});
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
it('should get and send stats to orbit on stat requests', done => {
|
||||
const stats = { stats: 'stats' };
|
||||
|
||||
/* Mock stats server, it returns a JSON object stringified on
|
||||
* request on the stats path. */
|
||||
const url = URL.parse(process.env.STAT_REPORT_URL);
|
||||
const statServer = http.createServer((request, response) => {
|
||||
if (request.url !== url.pathname) {
|
||||
response.writeHead(400, { 'Content-type': 'text/plan' });
|
||||
response.write('bad path');
|
||||
response.end();
|
||||
return;
|
||||
}
|
||||
response.writeHead(200, { 'Content-type': 'text/plan' });
|
||||
response.write(JSON.stringify(stats));
|
||||
response.end();
|
||||
});
|
||||
statServer.listen(url.port);
|
||||
|
||||
/* Once the management process is connected to orbit, send it a
|
||||
* stat request. */
|
||||
const orbitSimulator = new OrbitSimulatorServer();
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||
orbitSimulator.sendMessage(METRICS_REQ_MSG_TYPE, 'data');
|
||||
});
|
||||
|
||||
/* And finally check the management process replies by a metrics
|
||||
* report message. */
|
||||
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
|
||||
const message = new ChannelMessageV0(data);
|
||||
|
||||
assert.strictEqual(message.getType(), METRICS_REPORT_MSG_TYPE);
|
||||
assert.deepStrictEqual(JSON.parse(message.payload.toString()),
|
||||
stats);
|
||||
|
||||
statServer.close(() => { orbitSimulator.stop(done); });
|
||||
});
|
||||
orbitSimulator.start();
|
||||
});
|
||||
|
||||
/* Test suite which requires the management process to have received an
|
||||
* overlay with the browserAccessEnabled set to true, otherwise payload
|
||||
* message are not forwarded. This test suite is separated from the
|
||||
* other tests because of its internal requirement. As long as there is
|
||||
* no tests launching/stoping the management process there is no
|
||||
* better way to test this. Anonymous function not used here to save an
|
||||
* indentation level. */
|
||||
describe('Management process with browser access enabled',
|
||||
testWithBrowserAccessEnabled);
|
||||
});
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"tests": {
|
||||
"files": [ "/test" ],
|
||||
"on": "aggressor"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,126 @@
|
|||
const assert = require('assert');
|
||||
const { EventEmitter } = require('events');
|
||||
const url = require('url');
|
||||
const WebSocket = require('ws');
|
||||
|
||||
const logger = require('../../../lib/utilities/logger');
|
||||
const OrbitSimulatorEvent = {
|
||||
EVENT_LISTENING: 'listening',
|
||||
EVENT_CONNECTION: 'connection',
|
||||
EVENT_MESSAGE: 'message',
|
||||
};
|
||||
const {
|
||||
ChannelMessageV0,
|
||||
MessageType,
|
||||
} = require('../../../lib/management/ChannelMessageV0');
|
||||
const {
|
||||
pushEndpointUrlFromInstanceId,
|
||||
} = require('../../../lib/management/push');
|
||||
|
||||
|
||||
class OrbitSimulatorServer extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
|
||||
assert(process.env.INITIAL_INSTANCE_ID,
|
||||
'INITIAL_INSTANCE_ID env variable is required');
|
||||
const instanceId = process.env.INITIAL_INSTANCE_ID;
|
||||
|
||||
const endpointUrl = pushEndpointUrlFromInstanceId(instanceId);
|
||||
const endpointUrlObj = url.parse(endpointUrl);
|
||||
|
||||
assert(endpointUrlObj.host, 'localhost');
|
||||
|
||||
this.wss = null;
|
||||
this.port = endpointUrlObj.port;
|
||||
this.path = endpointUrlObj.path;
|
||||
|
||||
this.stop = this.stop.bind(this);
|
||||
process.on('SIGINT', this.stop);
|
||||
process.on('SIGHUP', this.stop);
|
||||
process.on('SIGQUIT', this.stop);
|
||||
process.on('SIGTERM', this.stop);
|
||||
process.on('SIGPIPE', () => {});
|
||||
}
|
||||
|
||||
start() {
|
||||
this.wss = new WebSocket.Server({
|
||||
port: this.port,
|
||||
path: this.path,
|
||||
clientTracking: true,
|
||||
});
|
||||
|
||||
this.wss.on('connection', clientWs => {
|
||||
clientWs.on('message', data => {
|
||||
this.emit(OrbitSimulatorEvent.EVENT_MESSAGE, data);
|
||||
});
|
||||
this.emit(OrbitSimulatorEvent.EVENT_CONNECTION);
|
||||
});
|
||||
this.wss.on('listening', () => {
|
||||
this.emit(OrbitSimulatorEvent.EVENT_LISTENING);
|
||||
});
|
||||
this.wss.on('error', error => {
|
||||
logger.error('orbit CI simulator error', { error });
|
||||
});
|
||||
}
|
||||
|
||||
stop(cb) {
|
||||
if (!this.wss) {
|
||||
if (cb) {
|
||||
cb();
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.wss.close(() => {
|
||||
if (cb) {
|
||||
cb();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
sendMessage(messageType, body, channelId) {
|
||||
let message = null;
|
||||
|
||||
switch (messageType) {
|
||||
case MessageType.CONFIG_OVERLAY_MESSAGE:
|
||||
assert(body);
|
||||
message = ChannelMessageV0.encodeConfigOverlayMessage(body);
|
||||
break;
|
||||
|
||||
case MessageType.METRICS_REQUEST_MESSAGE:
|
||||
assert(body);
|
||||
message = ChannelMessageV0.encodeMetricsRequestMessage(body);
|
||||
break;
|
||||
|
||||
case MessageType.METRICS_REPORT_MESSAGE:
|
||||
assert(body);
|
||||
message = ChannelMessageV0.encodeMetricsReportMessage(body);
|
||||
break;
|
||||
|
||||
case MessageType.CHANNEL_CLOSE_MESSAGE:
|
||||
assert(channelId);
|
||||
message = ChannelMessageV0.encodeChannelCloseMessage(channelId);
|
||||
break;
|
||||
|
||||
case MessageType.CHANNEL_PAYLOAD_MESSAGE:
|
||||
assert(body);
|
||||
assert(channelId);
|
||||
message = ChannelMessageV0.encodeChannelDataMessage(
|
||||
channelId,
|
||||
Buffer.from(body)
|
||||
);
|
||||
break;
|
||||
|
||||
default:
|
||||
logger.error('unsupported message type', { messageType });
|
||||
return;
|
||||
}
|
||||
|
||||
this.wss.clients.forEach(client => { client.send(message); });
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
OrbitSimulatorServer,
|
||||
OrbitSimulatorEvent,
|
||||
};
|
|
@ -0,0 +1,6 @@
|
|||
{
|
||||
"name": "management-agent-tests",
|
||||
"scripts": {
|
||||
"test": "mocha -t 40000 *.js"
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue