Compare commits
11 Commits
developmen
...
feature/ZE
Author | SHA1 | Date |
---|---|---|
jeremyds | 240d16a280 | |
jeremyds | ce8e01f37c | |
jeremyds | 81181892d5 | |
jeremyds | b3b371d597 | |
jeremyds | 1804d8b10a | |
jeremyds | 84c51bdac8 | |
jeremyds | 9f6c5fac73 | |
jeremyds | 5a578b3ef0 | |
jeremyds | 5dc99e24aa | |
jeremyds | 052628a9d4 | |
jeremyds | 37a4fc30e0 |
|
@ -26,3 +26,4 @@ ENTRYPOINT ["/usr/src/app/docker-entrypoint.sh"]
|
||||||
CMD [ "npm", "start" ]
|
CMD [ "npm", "start" ]
|
||||||
|
|
||||||
EXPOSE 8000
|
EXPOSE 8000
|
||||||
|
EXPOSE 9753
|
||||||
|
|
45
eve/main.yml
45
eve/main.yml
|
@ -65,6 +65,16 @@ models:
|
||||||
S3BACKEND: "mem"
|
S3BACKEND: "mem"
|
||||||
MPU_TESTING: "yes"
|
MPU_TESTING: "yes"
|
||||||
S3METADATA: mongodb
|
S3METADATA: mongodb
|
||||||
|
- env: &management-vars
|
||||||
|
S3BACKEND: "file"
|
||||||
|
S3METADATA: mongodb
|
||||||
|
PUSH_ENDPOINT: "http://localhost:9989/endpoint"
|
||||||
|
STAT_REPORT_URL: "http://localhost:8081/stat"
|
||||||
|
SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST: "localhost"
|
||||||
|
SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT: "8082"
|
||||||
|
MANAGEMENT_CHECK_CLIENT_FQCY_MS: "2000"
|
||||||
|
ORBIT_CONNECTION_TIMEOUT_MS: "1000"
|
||||||
|
MANAGEMENT_MODE: "push"
|
||||||
- env: &multiple-backend-vars
|
- env: &multiple-backend-vars
|
||||||
S3BACKEND: "mem"
|
S3BACKEND: "mem"
|
||||||
S3DATA: "multiple"
|
S3DATA: "multiple"
|
||||||
|
@ -121,6 +131,7 @@ stages:
|
||||||
- file-ft-tests
|
- file-ft-tests
|
||||||
- multiple-backend-test
|
- multiple-backend-test
|
||||||
- mongo-ft-tests
|
- mongo-ft-tests
|
||||||
|
- management-agent-tests
|
||||||
waitForFinish: True
|
waitForFinish: True
|
||||||
haltOnFailure: True
|
haltOnFailure: True
|
||||||
|
|
||||||
|
@ -165,6 +176,7 @@ stages:
|
||||||
vars:
|
vars:
|
||||||
aggressorMemLimit: "2Gi"
|
aggressorMemLimit: "2Gi"
|
||||||
s3MemLimit: "2Gi"
|
s3MemLimit: "2Gi"
|
||||||
|
managementProcess: disabled
|
||||||
env:
|
env:
|
||||||
<<: *multiple-backend-vars
|
<<: *multiple-backend-vars
|
||||||
<<: *global-env
|
<<: *global-env
|
||||||
|
@ -209,6 +221,7 @@ stages:
|
||||||
aggressorMemLimit: "2Gi"
|
aggressorMemLimit: "2Gi"
|
||||||
s3MemLimit: "1664Mi"
|
s3MemLimit: "1664Mi"
|
||||||
redis: enabled
|
redis: enabled
|
||||||
|
managementProcess: disabled
|
||||||
env:
|
env:
|
||||||
<<: *mongo-vars
|
<<: *mongo-vars
|
||||||
<<: *global-env
|
<<: *global-env
|
||||||
|
@ -227,6 +240,37 @@ stages:
|
||||||
<<: *global-env
|
<<: *global-env
|
||||||
- Upload: *upload-artifacts
|
- Upload: *upload-artifacts
|
||||||
|
|
||||||
|
management-agent-tests:
|
||||||
|
worker: &management-pod
|
||||||
|
type: kube_pod
|
||||||
|
path: eve/workers/pod.yaml
|
||||||
|
images:
|
||||||
|
aggressor: eve/workers/build
|
||||||
|
management_agent: "."
|
||||||
|
vars:
|
||||||
|
aggressorMemLimit: "1Gi"
|
||||||
|
managementProcess: enabled
|
||||||
|
env:
|
||||||
|
<<: *management-vars
|
||||||
|
<<: *global-env
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- Git: *clone
|
||||||
|
- ShellCommand: *credentials
|
||||||
|
- ShellCommand: *npm-install
|
||||||
|
- ShellCommand:
|
||||||
|
command: |
|
||||||
|
bash -c "
|
||||||
|
set -ex
|
||||||
|
bash wait_for_local_port.bash 9753 40
|
||||||
|
source /artifacts/management_test_instance_id
|
||||||
|
npm run ft_management_agent"
|
||||||
|
env:
|
||||||
|
<<: *mongo-vars
|
||||||
|
<<: *management-vars
|
||||||
|
<<: *global-env
|
||||||
|
- Upload: *upload-artifacts
|
||||||
|
|
||||||
file-ft-tests:
|
file-ft-tests:
|
||||||
worker:
|
worker:
|
||||||
type: kube_pod
|
type: kube_pod
|
||||||
|
@ -238,6 +282,7 @@ stages:
|
||||||
aggressorMemLimit: "1920Mi"
|
aggressorMemLimit: "1920Mi"
|
||||||
s3MemLimit: "2Gi"
|
s3MemLimit: "2Gi"
|
||||||
redis: enabled
|
redis: enabled
|
||||||
|
managementProcess: disabled
|
||||||
env:
|
env:
|
||||||
<<: *file-mem-mpu
|
<<: *file-mem-mpu
|
||||||
<<: *global-env
|
<<: *global-env
|
||||||
|
|
|
@ -40,12 +40,13 @@ spec:
|
||||||
readOnly: false
|
readOnly: false
|
||||||
mountPath: /root/.aws
|
mountPath: /root/.aws
|
||||||
- name: artifacts
|
- name: artifacts
|
||||||
readOnly: true
|
readOnly: false
|
||||||
mountPath: /artifacts
|
mountPath: /artifacts
|
||||||
command:
|
command:
|
||||||
- bash
|
- bash
|
||||||
- -lc
|
- -lc
|
||||||
- |
|
- |
|
||||||
|
echo "export INITIAL_INSTANCE_ID=$(od -x /dev/urandom | head -1 | awk '{OFS="-"; print $2$3,$4,$5,$6,$7$8$9}')" > /artifacts/management_test_instance_id
|
||||||
buildbot-worker create-worker . $BUILDMASTER:$BUILDMASTER_PORT $WORKERNAME $WORKERPASS
|
buildbot-worker create-worker . $BUILDMASTER:$BUILDMASTER_PORT $WORKERNAME $WORKERPASS
|
||||||
buildbot-worker start --nodaemon
|
buildbot-worker start --nodaemon
|
||||||
env:
|
env:
|
||||||
|
@ -61,6 +62,7 @@ spec:
|
||||||
- name: {{ key }}
|
- name: {{ key }}
|
||||||
value: "{{ value }}"
|
value: "{{ value }}"
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
{% if vars.managementProcess is not defined or vars.managementProcess != "enabled" -%}
|
||||||
- name: s3
|
- name: s3
|
||||||
image: {{ images.s3 }}
|
image: {{ images.s3 }}
|
||||||
imagePullPolicy: IfNotPresent
|
imagePullPolicy: IfNotPresent
|
||||||
|
@ -114,6 +116,64 @@ spec:
|
||||||
- name: {{ key }}
|
- name: {{ key }}
|
||||||
value: "{{ value }}"
|
value: "{{ value }}"
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
{%- endif %}
|
||||||
|
{% if vars.managementProcess is defined and vars.managementProcess == "enabled" -%}
|
||||||
|
- name: management
|
||||||
|
image: {{ images.management_agent }}
|
||||||
|
imagePullPolicy: IfNotPresent
|
||||||
|
resources:
|
||||||
|
requests:
|
||||||
|
cpu: 100m
|
||||||
|
memory: 1Gi
|
||||||
|
limits:
|
||||||
|
cpu: 200m
|
||||||
|
memory: 1Gi
|
||||||
|
volumeMounts:
|
||||||
|
- name: creds
|
||||||
|
readOnly: false
|
||||||
|
mountPath: /root/.aws
|
||||||
|
- name: certs
|
||||||
|
readOnly: true
|
||||||
|
mountPath: /tmp
|
||||||
|
- name: artifacts
|
||||||
|
readOnly: false
|
||||||
|
mountPath: /artifacts
|
||||||
|
command:
|
||||||
|
- bash
|
||||||
|
- -ec
|
||||||
|
- |
|
||||||
|
sleep 10 # wait for mongo
|
||||||
|
source /artifacts/management_test_instance_id
|
||||||
|
/usr/src/app/docker-entrypoint.sh npm run management_agent | tee -a /artifacts/management.log
|
||||||
|
env:
|
||||||
|
{% if vars.env.S3DATA is defined and vars.env.S3DATA == "multiple" -%}
|
||||||
|
- name: S3_LOCATION_FILE
|
||||||
|
value: "/usr/src/app/tests/locationConfig/locationConfigTests.json"
|
||||||
|
{%- endif %}
|
||||||
|
- name: CI
|
||||||
|
value: "true"
|
||||||
|
- name: ENABLE_LOCAL_CACHE
|
||||||
|
value: "true"
|
||||||
|
- name: MONGODB_HOSTS
|
||||||
|
value: "localhost:27018"
|
||||||
|
- name: MONGODB_RS
|
||||||
|
value: "rs0"
|
||||||
|
- name: REDIS_HOST
|
||||||
|
value: "localhost"
|
||||||
|
- name: REDIS_PORT
|
||||||
|
value: "6379"
|
||||||
|
- name: REPORT_TOKEN
|
||||||
|
value: "report-token-1"
|
||||||
|
- name: REMOTE_MANAGEMENT_DISABLE
|
||||||
|
value: "0"
|
||||||
|
- name: HEALTHCHECKS_ALLOWFROM
|
||||||
|
value: "0.0.0.0/0"
|
||||||
|
{% for key, value in vars.env.items() %}
|
||||||
|
- name: {{ key }}
|
||||||
|
value: "{{ value }}"
|
||||||
|
{% endfor %}
|
||||||
|
{%- endif %}
|
||||||
|
|
||||||
{% if vars.redis is defined and vars.redis == "enabled" -%}
|
{% if vars.redis is defined and vars.redis == "enabled" -%}
|
||||||
- name: redis
|
- name: redis
|
||||||
image: redis:alpine
|
image: redis:alpine
|
||||||
|
|
|
@ -951,7 +951,7 @@ class Config extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.managementAgent = {};
|
this.managementAgent = {};
|
||||||
this.managementAgent.port = 8010;
|
this.managementAgent.port = 9753;
|
||||||
this.managementAgent.host = 'localhost';
|
this.managementAgent.host = 'localhost';
|
||||||
if (config.managementAgent !== undefined) {
|
if (config.managementAgent !== undefined) {
|
||||||
if (config.managementAgent.port !== undefined) {
|
if (config.managementAgent.port !== undefined) {
|
||||||
|
|
|
@ -100,7 +100,7 @@ class ChannelMessageV0 {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a wire representation of a metrics message
|
* Creates a wire representation of a metrics report message
|
||||||
*
|
*
|
||||||
* @param {object} body Metrics report
|
* @param {object} body Metrics report
|
||||||
*
|
*
|
||||||
|
@ -116,6 +116,40 @@ class ChannelMessageV0 {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a wire representation of a configuration overlay message
|
||||||
|
*
|
||||||
|
* @param {object} body configuration overlay
|
||||||
|
*
|
||||||
|
* @returns {Buffer} wire representation
|
||||||
|
*/
|
||||||
|
static encodeConfigOverlayMessage(body) {
|
||||||
|
const overlay = JSON.stringify(body);
|
||||||
|
const buf = Buffer.alloc(overlay.length + headerSize);
|
||||||
|
buf.writeUInt8(MessageType.CONFIG_OVERLAY_MESSAGE, 0);
|
||||||
|
buf.writeUInt8(0, 1);
|
||||||
|
buf.writeUInt8(TargetType.TARGET_ANY, 2);
|
||||||
|
buf.write(overlay, headerSize);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a wire representation of a metric request message
|
||||||
|
*
|
||||||
|
* @param {object} body Metrics request
|
||||||
|
*
|
||||||
|
* @returns {Buffer} wire representation
|
||||||
|
*/
|
||||||
|
static encodeMetricsRequestMessage(body) {
|
||||||
|
const request = JSON.stringify(body);
|
||||||
|
const buf = Buffer.alloc(request.length + headerSize);
|
||||||
|
buf.writeUInt8(MessageType.METRICS_REQUEST_MESSAGE, 0);
|
||||||
|
buf.writeUInt8(0, 1);
|
||||||
|
buf.writeUInt8(TargetType.TARGET_ANY, 2);
|
||||||
|
buf.write(request, headerSize);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protocol name used for subprotocol negociation
|
* Protocol name used for subprotocol negociation
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -82,13 +82,7 @@ function initManagementClient() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function isManagementAgentUsed() {
|
|
||||||
return process.env.MANAGEMENT_USE_AGENT === '1';
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
managementAgentMessageType,
|
managementAgentMessageType,
|
||||||
initManagementClient,
|
initManagementClient,
|
||||||
isManagementAgentUsed,
|
|
||||||
};
|
};
|
||||||
|
|
|
@ -318,33 +318,10 @@ function loadCachedOverlay(log, callback) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function applyAndSaveOverlay(overlay, log) {
|
|
||||||
patchConfiguration(overlay, log, err => {
|
|
||||||
if (err) {
|
|
||||||
log.error('could not apply pushed overlay', {
|
|
||||||
error: reshapeExceptionError(err),
|
|
||||||
method: 'applyAndSaveOverlay',
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
saveConfigurationVersion(null, overlay, log, err => {
|
|
||||||
if (err) {
|
|
||||||
log.error('could not cache overlay version', {
|
|
||||||
error: reshapeExceptionError(err),
|
|
||||||
method: 'applyAndSaveOverlay',
|
|
||||||
});
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
log.info('overlay push processed');
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
loadCachedOverlay,
|
loadCachedOverlay,
|
||||||
managementDatabaseName,
|
managementDatabaseName,
|
||||||
patchConfiguration,
|
patchConfiguration,
|
||||||
saveConfigurationVersion,
|
saveConfigurationVersion,
|
||||||
remoteOverlayIsNewer,
|
remoteOverlayIsNewer,
|
||||||
applyAndSaveOverlay,
|
|
||||||
};
|
};
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
/* From RFC6455 which defines some reserved status code ranges, the range
|
||||||
|
* 4000-4999 are for private use. */
|
||||||
|
const WS_STATUS_IDLE = {
|
||||||
|
code: 4000,
|
||||||
|
reason: 'does not reply to ping before timeout',
|
||||||
|
};
|
||||||
|
|
||||||
|
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS =
|
||||||
|
process.env.MANAGEMENT_CHECK_CLIENT_FQCY_MS || 15000;
|
||||||
|
|
||||||
|
const endpointPath = 'api/v1/instance';
|
||||||
|
|
||||||
|
const managementEndpointRoot =
|
||||||
|
process.env.MANAGEMENT_ENDPOINT ||
|
||||||
|
'https://api.zenko.io';
|
||||||
|
const managementEndpoint = `${managementEndpointRoot}/${endpointPath}`;
|
||||||
|
|
||||||
|
const pushEndpointRoot =
|
||||||
|
process.env.PUSH_ENDPOINT ||
|
||||||
|
'https://push.api.zenko.io';
|
||||||
|
const pushEndpoint = `${pushEndpointRoot}/${endpointPath}`;
|
||||||
|
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
WS_STATUS_IDLE,
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||||
|
managementEndpoint,
|
||||||
|
pushEndpoint,
|
||||||
|
};
|
|
@ -3,6 +3,7 @@ const forge = require('node-forge');
|
||||||
const request = require('request');
|
const request = require('request');
|
||||||
|
|
||||||
const metadata = require('../metadata/wrapper');
|
const metadata = require('../metadata/wrapper');
|
||||||
|
const { managementEndpoint } = require('./constants');
|
||||||
|
|
||||||
const managementDatabaseName = 'PENSIEVE';
|
const managementDatabaseName = 'PENSIEVE';
|
||||||
const tokenConfigurationKey = 'auth/zenko/remote-management-token';
|
const tokenConfigurationKey = 'auth/zenko/remote-management-token';
|
||||||
|
@ -25,7 +26,7 @@ function getStoredCredentials(log, callback) {
|
||||||
log, callback);
|
log, callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
function issueCredentials(managementEndpoint, instanceId, log, callback) {
|
function issueCredentials(instanceId, log, callback) {
|
||||||
log.info('registering with API to get token');
|
log.info('registering with API to get token');
|
||||||
|
|
||||||
const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 });
|
const keyPair = forge.pki.rsa.generateKeyPair({ bits: 2048, e: 0x10001 });
|
||||||
|
@ -54,8 +55,7 @@ function issueCredentials(managementEndpoint, instanceId, log, callback) {
|
||||||
}).json(postData);
|
}).json(postData);
|
||||||
}
|
}
|
||||||
|
|
||||||
function confirmInstanceCredentials(
|
function confirmInstanceCredentials(instanceId, creds, log, callback) {
|
||||||
managementEndpoint, instanceId, creds, log, callback) {
|
|
||||||
const opts = {
|
const opts = {
|
||||||
headers: {
|
headers: {
|
||||||
'x-instance-authentication-token': creds.token,
|
'x-instance-authentication-token': creds.token,
|
||||||
|
@ -84,7 +84,6 @@ function confirmInstanceCredentials(
|
||||||
* is registered as new against the Orbit API with newly-generated
|
* is registered as new against the Orbit API with newly-generated
|
||||||
* RSA key pair.
|
* RSA key pair.
|
||||||
*
|
*
|
||||||
* @param {string} managementEndpoint API endpoint
|
|
||||||
* @param {string} instanceId UUID of this deployment
|
* @param {string} instanceId UUID of this deployment
|
||||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
||||||
* initialization process
|
* initialization process
|
||||||
|
@ -92,12 +91,11 @@ function confirmInstanceCredentials(
|
||||||
*
|
*
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
function initManagementCredentials(
|
function initManagementCredentials(instanceId, log, callback) {
|
||||||
managementEndpoint, instanceId, log, callback) {
|
|
||||||
getStoredCredentials(log, (error, value) => {
|
getStoredCredentials(log, (error, value) => {
|
||||||
if (error) {
|
if (error) {
|
||||||
if (error.NoSuchKey) {
|
if (error.NoSuchKey) {
|
||||||
return issueCredentials(managementEndpoint, instanceId, log,
|
return issueCredentials(instanceId, log,
|
||||||
(error, value) => {
|
(error, value) => {
|
||||||
if (error) {
|
if (error) {
|
||||||
log.error('could not issue token',
|
log.error('could not issue token',
|
||||||
|
@ -118,8 +116,7 @@ function initManagementCredentials(
|
||||||
log.info('saved token locally, ' +
|
log.info('saved token locally, ' +
|
||||||
'confirming instance');
|
'confirming instance');
|
||||||
return confirmInstanceCredentials(
|
return confirmInstanceCredentials(
|
||||||
managementEndpoint, instanceId, value, log,
|
instanceId, value, log, callback);
|
||||||
callback);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
const arsenal = require('arsenal');
|
const arsenal = require('arsenal');
|
||||||
|
const errors = arsenal.errors;
|
||||||
const async = require('async');
|
const async = require('async');
|
||||||
|
|
||||||
const metadata = require('../metadata/wrapper');
|
const metadata = require('../metadata/wrapper');
|
||||||
|
@ -13,19 +14,9 @@ const { initManagementCredentials } = require('./credentials');
|
||||||
const { startWSManagementClient } = require('./push');
|
const { startWSManagementClient } = require('./push');
|
||||||
const { startPollingManagementClient } = require('./poll');
|
const { startPollingManagementClient } = require('./poll');
|
||||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||||
const { isManagementAgentUsed } = require('./agentClient');
|
|
||||||
|
|
||||||
const initRemoteManagementRetryDelay = 10000;
|
const initRemoteManagementRetryDelay = 10000;
|
||||||
|
|
||||||
const managementEndpointRoot =
|
|
||||||
process.env.MANAGEMENT_ENDPOINT ||
|
|
||||||
'https://api.zenko.io';
|
|
||||||
const managementEndpoint = `${managementEndpointRoot}/api/v1/instance`;
|
|
||||||
|
|
||||||
const pushEndpointRoot =
|
|
||||||
process.env.PUSH_ENDPOINT ||
|
|
||||||
'https://push.api.zenko.io';
|
|
||||||
const pushEndpoint = `${pushEndpointRoot}/api/v1/instance`;
|
|
||||||
|
|
||||||
function initManagementDatabase(log, callback) {
|
function initManagementDatabase(log, callback) {
|
||||||
// XXX choose proper owner names
|
// XXX choose proper owner names
|
||||||
|
@ -48,12 +39,12 @@ function initManagementDatabase(log, callback) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function startManagementListeners(instanceId, token) {
|
function startManagementListeners(instanceId, newOverlayCallback, token) {
|
||||||
const mode = process.env.MANAGEMENT_MODE || 'push';
|
const mode = process.env.MANAGEMENT_MODE || 'push';
|
||||||
if (mode === 'push') {
|
if (mode === 'push') {
|
||||||
startWSManagementClient(pushEndpoint, instanceId, token);
|
startWSManagementClient(instanceId, token, newOverlayCallback);
|
||||||
} else {
|
} else {
|
||||||
startPollingManagementClient(managementEndpoint, instanceId, token);
|
startPollingManagementClient(instanceId, token, newOverlayCallback);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,14 +57,15 @@ function startManagementListeners(instanceId, token) {
|
||||||
* - loading and applying the latest cached overlay configuration
|
* - loading and applying the latest cached overlay configuration
|
||||||
* - starting a configuration update and metrics push background task
|
* - starting a configuration update and metrics push background task
|
||||||
*
|
*
|
||||||
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
|
* @param {werelogs~Logger} log Request-scoped logger to be able to
|
||||||
* initialization process
|
* trace initialization process
|
||||||
* @param {function} callback Function to call once the overlay is loaded
|
* @param {function} newOverlayCallback Function to call once a new overlay
|
||||||
* (overlay)
|
* is received (overlay)
|
||||||
*
|
* @param {function} callback Function to call once the overlay
|
||||||
|
* is loaded (overlay)
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
function initManagement(log, callback) {
|
function initManagement(log, newOverlayCallback, callback) {
|
||||||
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
|
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
|
||||||
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|
||||||
|| process.env.S3BACKEND === 'mem') {
|
|| process.env.S3BACKEND === 'mem') {
|
||||||
|
@ -81,34 +73,28 @@ function initManagement(log, callback) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Temporary check before to fully move to the process management agent. */
|
|
||||||
if (isManagementAgentUsed() ^ typeof callback === 'function') {
|
|
||||||
let msg = 'misuse of initManagement function: ';
|
|
||||||
msg += `MANAGEMENT_USE_AGENT: ${process.env.MANAGEMENT_USE_AGENT}`;
|
|
||||||
msg += `, callback type: ${typeof callback}`;
|
|
||||||
throw new Error(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
// eslint-disable-next-line arrow-body-style
|
cb => metadata.setup(cb),
|
||||||
cb => { return isManagementAgentUsed() ? metadata.setup(cb) : cb(); },
|
|
||||||
cb => initManagementDatabase(log, cb),
|
cb => initManagementDatabase(log, cb),
|
||||||
cb => metadata.getUUID(log, cb),
|
cb => metadata.getUUID(log, cb),
|
||||||
(instanceId, cb) => initManagementCredentials(
|
(instanceId, cb) => {
|
||||||
managementEndpoint, instanceId, log, cb),
|
const initialID = process.env.INITIAL_INSTANCE_ID;
|
||||||
(instanceId, token, cb) => {
|
if (initialID && initialID !== instanceId) {
|
||||||
if (!isManagementAgentUsed()) {
|
log.error('INITIAL_INSTANCE_ID value is different from ' +
|
||||||
cb(null, instanceId, token, {});
|
'instance id saved in metadata', {
|
||||||
return;
|
envVarValue: initialID,
|
||||||
|
metadataValue: instanceId,
|
||||||
|
});
|
||||||
|
return callback(errors.InvalidParameterValue);
|
||||||
}
|
}
|
||||||
|
return cb(null, instanceId);
|
||||||
|
},
|
||||||
|
(instanceId, cb) => initManagementCredentials(instanceId, log, cb),
|
||||||
|
(instanceId, token, cb) => {
|
||||||
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
|
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
|
||||||
token, overlay));
|
token, overlay));
|
||||||
},
|
},
|
||||||
(instanceId, token, overlay, cb) => {
|
(instanceId, token, overlay, cb) => {
|
||||||
if (!isManagementAgentUsed()) {
|
|
||||||
cb(null, instanceId, token, overlay);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
patchConfiguration(overlay, log,
|
patchConfiguration(overlay, log,
|
||||||
err => cb(err, instanceId, token, overlay));
|
err => cb(err, instanceId, token, overlay));
|
||||||
},
|
},
|
||||||
|
@ -119,13 +105,16 @@ function initManagement(log, callback) {
|
||||||
method: 'initManagement' });
|
method: 'initManagement' });
|
||||||
setTimeout(initManagement,
|
setTimeout(initManagement,
|
||||||
initRemoteManagementRetryDelay,
|
initRemoteManagementRetryDelay,
|
||||||
logger.newRequestLogger());
|
logger.newRequestLogger(),
|
||||||
|
newOverlayCallback,
|
||||||
|
callback
|
||||||
|
);
|
||||||
} else {
|
} else {
|
||||||
log.info(`this deployment's Instance ID is ${instanceId}`);
|
log.info(`this deployment's Instance ID is ${instanceId}`);
|
||||||
log.end('management init done');
|
log.end('management init done');
|
||||||
startManagementListeners(instanceId, token);
|
startManagementListeners(instanceId, newOverlayCallback, token);
|
||||||
if (callback) {
|
if (callback) {
|
||||||
callback(overlay);
|
callback(null, overlay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -7,16 +7,17 @@ const logger = require('../utilities/logger');
|
||||||
const metadata = require('../metadata/wrapper');
|
const metadata = require('../metadata/wrapper');
|
||||||
const {
|
const {
|
||||||
loadCachedOverlay,
|
loadCachedOverlay,
|
||||||
patchConfiguration,
|
|
||||||
saveConfigurationVersion,
|
|
||||||
} = require('./configuration');
|
} = require('./configuration');
|
||||||
|
const { managementEndpoint } = require('./constants');
|
||||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||||
|
|
||||||
const pushReportDelay = 30000;
|
const pushReportDelay = 30000;
|
||||||
const pullConfigurationOverlayDelay = 60000;
|
const pullConfigurationOverlayDelay = 60000;
|
||||||
|
|
||||||
function loadRemoteOverlay(
|
let overlayMessageListener = null;
|
||||||
managementEndpoint, instanceId, remoteToken, cachedOverlay, log, cb) {
|
let cachedOverlay = null;
|
||||||
|
|
||||||
|
function loadRemoteOverlay(instanceId, remoteToken, log, cb) {
|
||||||
log.debug('loading remote overlay');
|
log.debug('loading remote overlay');
|
||||||
const opts = {
|
const opts = {
|
||||||
headers: {
|
headers: {
|
||||||
|
@ -30,29 +31,34 @@ function loadRemoteOverlay(
|
||||||
return cb(error);
|
return cb(error);
|
||||||
}
|
}
|
||||||
if (response.statusCode === 200) {
|
if (response.statusCode === 200) {
|
||||||
return cb(null, cachedOverlay, body);
|
return cb(null, body);
|
||||||
}
|
}
|
||||||
if (response.statusCode === 404) {
|
if (response.statusCode === 404) {
|
||||||
return cb(null, cachedOverlay, {});
|
return cb(null, {});
|
||||||
}
|
}
|
||||||
return cb(arsenal.errors.AccessForbidden, cachedOverlay, {});
|
return cb(arsenal.errors.AccessForbidden, {});
|
||||||
}).json();
|
}).json();
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO save only after successful patch
|
// TODO save only after successful patch
|
||||||
function applyConfigurationOverlay(
|
function applyConfigurationOverlay(instanceId, remoteToken, log) {
|
||||||
managementEndpoint, instanceId, remoteToken, log) {
|
|
||||||
async.waterfall([
|
async.waterfall([
|
||||||
wcb => loadCachedOverlay(log, wcb),
|
wcb => {
|
||||||
(cachedOverlay, wcb) => patchConfiguration(cachedOverlay,
|
if (cachedOverlay) {
|
||||||
log, wcb),
|
wcb(null, cachedOverlay);
|
||||||
(cachedOverlay, wcb) =>
|
} else {
|
||||||
loadRemoteOverlay(managementEndpoint, instanceId, remoteToken,
|
loadCachedOverlay(log, wcb);
|
||||||
cachedOverlay, log, wcb),
|
}
|
||||||
(cachedOverlay, remoteOverlay, wcb) =>
|
},
|
||||||
saveConfigurationVersion(cachedOverlay, remoteOverlay, log, wcb),
|
overlay => { cachedOverlay = overlay; },
|
||||||
(remoteOverlay, wcb) => patchConfiguration(remoteOverlay,
|
wcb => loadRemoteOverlay(instanceId, remoteToken,
|
||||||
log, wcb),
|
log, wcb),
|
||||||
|
remoteOverlay => {
|
||||||
|
cachedOverlay = remoteOverlay;
|
||||||
|
if (overlayMessageListener) {
|
||||||
|
overlayMessageListener(JSON.stringify(cachedOverlay));
|
||||||
|
}
|
||||||
|
},
|
||||||
], error => {
|
], error => {
|
||||||
if (error) {
|
if (error) {
|
||||||
log.error('could not apply managed configuration',
|
log.error('could not apply managed configuration',
|
||||||
|
@ -60,12 +66,11 @@ function applyConfigurationOverlay(
|
||||||
method: 'applyConfigurationOverlay' });
|
method: 'applyConfigurationOverlay' });
|
||||||
}
|
}
|
||||||
setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay,
|
setTimeout(applyConfigurationOverlay, pullConfigurationOverlayDelay,
|
||||||
managementEndpoint, instanceId, remoteToken,
|
instanceId, remoteToken, logger.newRequestLogger());
|
||||||
logger.newRequestLogger());
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function postStats(managementEndpoint, instanceId, remoteToken, next) {
|
function postStats(instanceId, remoteToken, next) {
|
||||||
const toURL = `${managementEndpoint}/${instanceId}/stats`;
|
const toURL = `${managementEndpoint}/${instanceId}/stats`;
|
||||||
const toOptions = {
|
const toOptions = {
|
||||||
headers: {
|
headers: {
|
||||||
|
@ -99,14 +104,19 @@ function getStats() {
|
||||||
return request(fromURL, fromOptions).json();
|
return request(fromURL, fromOptions).json();
|
||||||
}
|
}
|
||||||
|
|
||||||
function pushStats(managementEndpoint, instanceId, remoteToken, next) {
|
function pushStats(instanceId, remoteToken, next) {
|
||||||
if (process.env.PUSH_STATS === 'false') {
|
if (process.env.PUSH_STATS === 'false') {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
getStats().pipe(
|
getStats()
|
||||||
postStats(managementEndpoint, instanceId, remoteToken, next));
|
.on('error', err => {
|
||||||
setTimeout(pushStats, pushReportDelay,
|
/* If the management process launches too quick, the cloud server may
|
||||||
managementEndpoint, instanceId, remoteToken);
|
* not be listening yet. */
|
||||||
|
logger.info('failed to get stats', { err });
|
||||||
|
})
|
||||||
|
.pipe(
|
||||||
|
postStats(instanceId, remoteToken, next));
|
||||||
|
setTimeout(pushStats, pushReportDelay, instanceId, remoteToken);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -115,20 +125,23 @@ function pushStats(managementEndpoint, instanceId, remoteToken, next) {
|
||||||
* Periodically polls for configuration updates, and pushes stats at
|
* Periodically polls for configuration updates, and pushes stats at
|
||||||
* a fixed interval.
|
* a fixed interval.
|
||||||
*
|
*
|
||||||
* @param {string} managementEndpoint API endpoint
|
|
||||||
* @param {string} instanceId UUID of this deployment
|
* @param {string} instanceId UUID of this deployment
|
||||||
* @param {string} remoteToken API authentication token
|
* @param {string} remoteToken API authentication token
|
||||||
|
* @param {function} newOverlayCallback Function to call once a new overlay is
|
||||||
|
* loaded (overlay)
|
||||||
*
|
*
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
function startPollingManagementClient(
|
function startPollingManagementClient(
|
||||||
managementEndpoint, instanceId, remoteToken) {
|
instanceId, remoteToken, newOverlayCallback) {
|
||||||
|
overlayMessageListener = newOverlayCallback;
|
||||||
|
|
||||||
metadata.notifyBucketChange(() => {
|
metadata.notifyBucketChange(() => {
|
||||||
pushStats(managementEndpoint, instanceId, remoteToken);
|
pushStats(instanceId, remoteToken);
|
||||||
});
|
});
|
||||||
|
|
||||||
pushStats(managementEndpoint, instanceId, remoteToken);
|
pushStats(instanceId, remoteToken);
|
||||||
applyConfigurationOverlay(managementEndpoint, instanceId, remoteToken,
|
applyConfigurationOverlay(instanceId, remoteToken,
|
||||||
logger.newRequestLogger());
|
logger.newRequestLogger());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,19 +4,16 @@ const net = require('net');
|
||||||
const request = require('request');
|
const request = require('request');
|
||||||
const { URL } = require('url');
|
const { URL } = require('url');
|
||||||
const WebSocket = require('ws');
|
const WebSocket = require('ws');
|
||||||
const assert = require('assert');
|
|
||||||
|
|
||||||
const _config = require('../Config').config;
|
const _config = require('../Config').config;
|
||||||
const logger = require('../utilities/logger');
|
const logger = require('../utilities/logger');
|
||||||
const metadata = require('../metadata/wrapper');
|
const metadata = require('../metadata/wrapper');
|
||||||
|
|
||||||
const { reshapeExceptionError } = arsenal.errorUtils;
|
const { reshapeExceptionError } = arsenal.errorUtils;
|
||||||
const { isManagementAgentUsed } = require('./agentClient');
|
|
||||||
const { applyAndSaveOverlay } = require('./configuration');
|
|
||||||
const {
|
const {
|
||||||
ChannelMessageV0,
|
ChannelMessageV0,
|
||||||
MessageType,
|
MessageType,
|
||||||
} = require('./ChannelMessageV0');
|
} = require('./ChannelMessageV0');
|
||||||
|
const { pushEndpoint } = require('./constants');
|
||||||
|
|
||||||
const {
|
const {
|
||||||
CONFIG_OVERLAY_MESSAGE,
|
CONFIG_OVERLAY_MESSAGE,
|
||||||
|
@ -65,19 +62,23 @@ function createWSAgent(pushEndpoint, env, log) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function pushEndpointUrlFromInstanceId(instanceId) {
|
||||||
|
return `${pushEndpoint}/${instanceId}/ws`;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts background task that updates configuration and pushes stats.
|
* Starts background task that updates configuration and pushes stats.
|
||||||
*
|
*
|
||||||
* Receives pushed Websocket messages on configuration updates, and
|
* Receives pushed Websocket messages on configuration updates, and
|
||||||
* sends stat messages in response to API sollicitations.
|
* sends stat messages in response to API sollicitations.
|
||||||
*
|
*
|
||||||
* @param {string} pushEndpoint API endpoint
|
|
||||||
* @param {string} instanceId UUID of this deployment
|
* @param {string} instanceId UUID of this deployment
|
||||||
* @param {string} token API authentication token
|
* @param {string} token API authentication token
|
||||||
*
|
* @param {function} newOverlayCallback Function to call once a new overlay is
|
||||||
|
* loaded (overlay)
|
||||||
* @returns {undefined}
|
* @returns {undefined}
|
||||||
*/
|
*/
|
||||||
function startWSManagementClient(pushEndpoint, instanceId, token) {
|
function startWSManagementClient(instanceId, token, newOverlayCallback) {
|
||||||
logger.info('connecting to push server');
|
logger.info('connecting to push server');
|
||||||
function _logError(error, errorMessage, method) {
|
function _logError(error, errorMessage, method) {
|
||||||
if (error) {
|
if (error) {
|
||||||
|
@ -86,14 +87,18 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
overlayMessageListener = newOverlayCallback;
|
||||||
|
|
||||||
const socketsByChannelId = [];
|
const socketsByChannelId = [];
|
||||||
const headers = {
|
const headers = {
|
||||||
'x-instance-authentication-token': token,
|
'x-instance-authentication-token': token,
|
||||||
};
|
};
|
||||||
const agent = createWSAgent(pushEndpoint, process.env, logger);
|
const agent = createWSAgent(pushEndpoint, process.env, logger);
|
||||||
|
|
||||||
const url = `${pushEndpoint}/${instanceId}/ws`;
|
const url = pushEndpointUrlFromInstanceId(instanceId);
|
||||||
|
|
||||||
const ws = new WebSocket(url, subprotocols, { headers, agent });
|
const ws = new WebSocket(url, subprotocols, { headers, agent });
|
||||||
|
|
||||||
let pingTimeout = null;
|
let pingTimeout = null;
|
||||||
|
|
||||||
function sendPing() {
|
function sendPing() {
|
||||||
|
@ -112,7 +117,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
if (process.env.PUSH_STATS === 'false') {
|
if (process.env.PUSH_STATS === 'false') {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const fromURL = `http://localhost:${_config.port}/_/report`;
|
const fromURL = process.env.STAT_REPORT_URL ||
|
||||||
|
`http://localhost:${_config.port}/_/report`;
|
||||||
const fromOptions = {
|
const fromOptions = {
|
||||||
headers: {
|
headers: {
|
||||||
'x-scal-report-token': process.env.REPORT_TOKEN,
|
'x-scal-report-token': process.env.REPORT_TOKEN,
|
||||||
|
@ -121,7 +127,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
};
|
};
|
||||||
request(fromURL, fromOptions, (err, response, body) => {
|
request(fromURL, fromOptions, (err, response, body) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
_logError(err, 'failed to get metrics report', 'pushStats');
|
const what = 'failed to get metrics report';
|
||||||
|
const msg =
|
||||||
|
`${what} at ${fromURL} with headers ${fromOptions}`;
|
||||||
|
_logError(err, msg, 'pushStats');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
|
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
|
||||||
|
@ -204,8 +213,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
metadata.notifyBucketChange(null);
|
metadata.notifyBucketChange(null);
|
||||||
_config.removeListener('browser-access-enabled-change',
|
_config.removeListener('browser-access-enabled-change',
|
||||||
browserAccessChangeHandler);
|
browserAccessChangeHandler);
|
||||||
setTimeout(startWSManagementClient, 10000, pushEndpoint,
|
const timeout = process.env.ORBIT_CONNECTION_TIMEOUT_MS || 10000;
|
||||||
instanceId, token);
|
|
||||||
|
setTimeout(startWSManagementClient, timeout, instanceId, token,
|
||||||
|
newOverlayCallback);
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on('error', err => {
|
ws.on('error', err => {
|
||||||
|
@ -224,18 +235,13 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
});
|
});
|
||||||
|
|
||||||
ws.on('message', data => {
|
ws.on('message', data => {
|
||||||
const log = logger.newRequestLogger();
|
|
||||||
const message = new ChannelMessageV0(data);
|
const message = new ChannelMessageV0(data);
|
||||||
|
|
||||||
switch (message.getType()) {
|
switch (message.getType()) {
|
||||||
case CONFIG_OVERLAY_MESSAGE:
|
case CONFIG_OVERLAY_MESSAGE:
|
||||||
if (!isManagementAgentUsed()) {
|
|
||||||
applyAndSaveOverlay(JSON.parse(message.getPayload()), log);
|
|
||||||
} else {
|
|
||||||
if (overlayMessageListener) {
|
if (overlayMessageListener) {
|
||||||
overlayMessageListener(message.getPayload().toString());
|
overlayMessageListener(message.getPayload().toString());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case METRICS_REQUEST_MESSAGE:
|
case METRICS_REQUEST_MESSAGE:
|
||||||
pushStats();
|
pushStats();
|
||||||
|
@ -256,13 +262,8 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
function addOverlayMessageListener(callback) {
|
|
||||||
assert(typeof callback === 'function');
|
|
||||||
overlayMessageListener = callback;
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
createWSAgent,
|
createWSAgent,
|
||||||
startWSManagementClient,
|
startWSManagementClient,
|
||||||
addOverlayMessageListener,
|
pushEndpointUrlFromInstanceId,
|
||||||
};
|
};
|
||||||
|
|
|
@ -14,11 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
|
||||||
const api = require('./api/api');
|
const api = require('./api/api');
|
||||||
const data = require('./data/wrapper');
|
const data = require('./data/wrapper');
|
||||||
const metadata = require('./metadata/wrapper');
|
const metadata = require('./metadata/wrapper');
|
||||||
const { initManagement } = require('./management');
|
const { initManagementClient } = require('./management/agentClient');
|
||||||
const {
|
|
||||||
initManagementClient,
|
|
||||||
isManagementAgentUsed,
|
|
||||||
} = require('./management/agentClient');
|
|
||||||
|
|
||||||
const routes = arsenal.s3routes.routes;
|
const routes = arsenal.s3routes.routes;
|
||||||
const websiteEndpoints = _config.websiteEndpoints;
|
const websiteEndpoints = _config.websiteEndpoints;
|
||||||
|
@ -40,7 +36,6 @@ const STATS_INTERVAL = 5; // 5 seconds
|
||||||
const STATS_EXPIRY = 30; // 30 seconds
|
const STATS_EXPIRY = 30; // 30 seconds
|
||||||
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
|
const statsClient = new StatsClient(localCacheClient, STATS_INTERVAL,
|
||||||
STATS_EXPIRY);
|
STATS_EXPIRY);
|
||||||
const enableRemoteManagement = true;
|
|
||||||
|
|
||||||
class S3Server {
|
class S3Server {
|
||||||
/**
|
/**
|
||||||
|
@ -153,16 +148,8 @@ class S3Server {
|
||||||
|
|
||||||
// TODO this should wait for metadata healthcheck to be ok
|
// TODO this should wait for metadata healthcheck to be ok
|
||||||
// TODO only do this in cluster master
|
// TODO only do this in cluster master
|
||||||
if (enableRemoteManagement) {
|
|
||||||
if (!isManagementAgentUsed()) {
|
|
||||||
setTimeout(() => {
|
|
||||||
initManagement(logger.newRequestLogger());
|
|
||||||
}, 5000);
|
|
||||||
} else {
|
|
||||||
initManagementClient();
|
initManagementClient();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This exits the running process properly.
|
* This exits the running process properly.
|
||||||
|
|
|
@ -5,19 +5,20 @@ const logger = require('./lib/utilities/logger');
|
||||||
const { initManagement } = require('./lib/management');
|
const { initManagement } = require('./lib/management');
|
||||||
const _config = require('./lib/Config').config;
|
const _config = require('./lib/Config').config;
|
||||||
const { managementAgentMessageType } = require('./lib/management/agentClient');
|
const { managementAgentMessageType } = require('./lib/management/agentClient');
|
||||||
const { addOverlayMessageListener } = require('./lib/management/push');
|
|
||||||
const { saveConfigurationVersion } = require('./lib/management/configuration');
|
const { saveConfigurationVersion } = require('./lib/management/configuration');
|
||||||
|
const {
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||||
|
WS_STATUS_IDLE,
|
||||||
|
} = require('./lib/management/constants');
|
||||||
|
|
||||||
|
|
||||||
// TODO: auth?
|
// TODO: auth?
|
||||||
// TODO: werelogs with a specific name.
|
// TODO: werelogs with a specific name.
|
||||||
|
|
||||||
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
|
|
||||||
|
|
||||||
|
|
||||||
class ManagementAgentServer {
|
class ManagementAgentServer {
|
||||||
constructor() {
|
constructor() {
|
||||||
this.port = _config.managementAgent.port || 8010;
|
this.port = _config.managementAgent.port || 9753;
|
||||||
this.wss = null;
|
this.wss = null;
|
||||||
this.loadedOverlay = null;
|
this.loadedOverlay = null;
|
||||||
|
|
||||||
|
@ -29,25 +30,21 @@ class ManagementAgentServer {
|
||||||
process.on('SIGPIPE', () => {});
|
process.on('SIGPIPE', () => {});
|
||||||
}
|
}
|
||||||
|
|
||||||
start(_cb) {
|
start() {
|
||||||
const cb = _cb || function noop() {};
|
|
||||||
|
|
||||||
/* Define REPORT_TOKEN env variable needed by the management
|
/* Define REPORT_TOKEN env variable needed by the management
|
||||||
* module. */
|
* module. */
|
||||||
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|
||||||
|| _config.reportToken
|
|| _config.reportToken
|
||||||
|| Uuid.v4();
|
|| Uuid.v4();
|
||||||
|
|
||||||
initManagement(logger.newRequestLogger(), overlay => {
|
/* The initManegement function retries when it fails. */
|
||||||
let error = null;
|
const log = logger.newRequestLogger();
|
||||||
|
initManagement(log, this.onNewOverlay.bind(this), (err, overlay) => {
|
||||||
if (overlay) {
|
if (err) {
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
this.loadedOverlay = overlay;
|
this.loadedOverlay = overlay;
|
||||||
this.startServer();
|
this.startServer();
|
||||||
} else {
|
|
||||||
error = new Error('failed to init management');
|
|
||||||
}
|
|
||||||
return cb(error);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,8 +72,6 @@ class ManagementAgentServer {
|
||||||
|
|
||||||
setInterval(this.checkBrokenConnections.bind(this),
|
setInterval(this.checkBrokenConnections.bind(this),
|
||||||
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||||
|
|
||||||
addOverlayMessageListener(this.onNewOverlay.bind(this));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
onConnection(socket, request) {
|
onConnection(socket, request) {
|
||||||
|
@ -154,6 +149,14 @@ class ManagementAgentServer {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.loadedOverlay = remoteOverlayObj;
|
this.loadedOverlay = remoteOverlayObj;
|
||||||
|
if (this.loadedOverlay.browserAccess) {
|
||||||
|
if (Boolean(_config.browserAccessEnabled) !==
|
||||||
|
Boolean(this.loadedOverlay.browserAccess.enabled)) {
|
||||||
|
_config.browserAccessEnabled =
|
||||||
|
Boolean(this.loadedOverlay.browserAccess.enabled);
|
||||||
|
_config.emit('browser-access-enabled-change');
|
||||||
|
}
|
||||||
|
}
|
||||||
this.wss.clients.forEach(
|
this.wss.clients.forEach(
|
||||||
this._sendNewOverlayToClient.bind(this)
|
this._sendNewOverlayToClient.bind(this)
|
||||||
);
|
);
|
||||||
|
@ -166,7 +169,7 @@ class ManagementAgentServer {
|
||||||
logger.info('close broken connection', {
|
logger.info('close broken connection', {
|
||||||
client: client._socket._peername,
|
client: client._socket._peername,
|
||||||
});
|
});
|
||||||
client.terminate();
|
client.close(WS_STATUS_IDLE.code, WS_STATUS_IDLE.reason);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
client.isAlive = false;
|
client.isAlive = false;
|
||||||
|
|
|
@ -84,17 +84,19 @@
|
||||||
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
|
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
|
||||||
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
|
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
|
||||||
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
|
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
|
||||||
|
"ft_management_agent": "cd tests/functional/managementAgent/ && npm test",
|
||||||
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
|
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
|
||||||
"lint": "eslint $(git ls-files '*.js')",
|
"lint": "eslint $(git ls-files '*.js')",
|
||||||
"lint_md": "mdlint $(git ls-files '*.md')",
|
"lint_md": "mdlint $(git ls-files '*.md')",
|
||||||
"mem_backend": "S3BACKEND=mem node index.js",
|
"mem_backend": "S3BACKEND=mem node index.js",
|
||||||
"perf": "mocha tests/performance/s3standard.js",
|
"perf": "mocha tests/performance/s3standard.js",
|
||||||
"start": "npm-run-all --parallel start_dmd start_s3server",
|
"start": "npm-run-all --parallel start_dmd start_management start_s3server",
|
||||||
"start_mongo": "npm run cloudserver",
|
"start_mongo": "npm run cloudserver",
|
||||||
"start_mdserver": "node mdserver.js",
|
"start_mdserver": "node mdserver.js",
|
||||||
"start_dataserver": "node dataserver.js",
|
"start_dataserver": "node dataserver.js",
|
||||||
"start_s3server": "node index.js",
|
"start_s3server": "node index.js",
|
||||||
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
|
||||||
|
"start_management": "node managementAgent.js",
|
||||||
"start_utapi": "node lib/utapi/utapi.js",
|
"start_utapi": "node lib/utapi/utapi.js",
|
||||||
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
"utapi_replay": "node lib/utapi/utapiReplay.js",
|
||||||
"management_agent": "node managementAgent.js",
|
"management_agent": "node managementAgent.js",
|
||||||
|
|
|
@ -0,0 +1,425 @@
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
const assert = require('assert');
|
||||||
|
const EventEmitter = require('events');
|
||||||
|
const http = require('http');
|
||||||
|
const net = require('net');
|
||||||
|
const URL = require('url');
|
||||||
|
const WebSocket = require('ws');
|
||||||
|
|
||||||
|
const _config = require('../../../lib/Config').config;
|
||||||
|
const {
|
||||||
|
managementAgentMessageType,
|
||||||
|
} = require('../../../lib/management/agentClient');
|
||||||
|
const {
|
||||||
|
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
|
||||||
|
WS_STATUS_IDLE,
|
||||||
|
} = require('../../../lib/management/constants');
|
||||||
|
const {
|
||||||
|
OrbitSimulatorServer,
|
||||||
|
OrbitSimulatorEvent,
|
||||||
|
} = require('./orbitServerSimulator');
|
||||||
|
const {
|
||||||
|
ChannelMessageV0,
|
||||||
|
MessageType,
|
||||||
|
} = require('../../../lib/management/ChannelMessageV0');
|
||||||
|
|
||||||
|
|
||||||
|
const PAYLOAD_MSG_TYPE = MessageType.CHANNEL_PAYLOAD_MESSAGE;
|
||||||
|
const CONFIG_OVERLAY_MSG_TYPE = MessageType.CONFIG_OVERLAY_MESSAGE;
|
||||||
|
const METRICS_REQ_MSG_TYPE = MessageType.METRICS_REQUEST_MESSAGE;
|
||||||
|
const METRICS_REPORT_MSG_TYPE = MessageType.METRICS_REPORT_MESSAGE;
|
||||||
|
const CHAN_CLOSE_MSG_TYPE = MessageType.CHANNEL_CLOSE_MESSAGE;
|
||||||
|
|
||||||
|
|
||||||
|
function testWithBrowserAccessEnabled() {
|
||||||
|
before(done => {
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
const overlay = { browserAccess: { enabled: false } };
|
||||||
|
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, overlay);
|
||||||
|
orbitSimulator.stop(done);
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not forward payload message if no overlay has been received',
|
||||||
|
done => {
|
||||||
|
const chanId = 1;
|
||||||
|
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
|
||||||
|
/* Payload message socket server. The management process forwards
|
||||||
|
* payload message from Orbit. */
|
||||||
|
const server = new net.Server();
|
||||||
|
server.on('connection', () => {
|
||||||
|
done(new Error('should not connect to payload message server'));
|
||||||
|
});
|
||||||
|
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||||
|
|
||||||
|
/* Once the management process is connected to orbit, send it a
|
||||||
|
* payload message. It is supposed to create the channel ID socket
|
||||||
|
* with the payload message server. */
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, 'data', chanId);
|
||||||
|
setTimeout(() => {
|
||||||
|
server.close(() => {
|
||||||
|
orbitSimulator.stop(done);
|
||||||
|
});
|
||||||
|
}, 4000);
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
/* XXX: this test set the managementProcess config browserAccessEnabled to
|
||||||
|
* true, allowing it to forward payload message. This internal state value
|
||||||
|
* is required for the following tests. */
|
||||||
|
it('should close a channel ID when requested', done => {
|
||||||
|
const chanId = 1;
|
||||||
|
let closeMessageSent = false;
|
||||||
|
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
|
||||||
|
/* Payload message socket server. The management process forwards
|
||||||
|
* payload message from Orbit. */
|
||||||
|
const server = new net.Server();
|
||||||
|
server.on('connection', s => {
|
||||||
|
s.on('close', hadError => {
|
||||||
|
/* Make sure the connection had been closed after the CLOSE
|
||||||
|
* message has been sent by Orbit and there is no error. */
|
||||||
|
assert.strictEqual(hadError, false);
|
||||||
|
assert.strictEqual(closeMessageSent, true);
|
||||||
|
|
||||||
|
server.close(() => { orbitSimulator.stop(done); });
|
||||||
|
});
|
||||||
|
s.on('data', () => {
|
||||||
|
/* Once the payload message has been forwarded from Orbit
|
||||||
|
* to the socket server by the management process, make
|
||||||
|
* Orbit request this channel close. */
|
||||||
|
closeMessageSent = true;
|
||||||
|
orbitSimulator.sendMessage(CHAN_CLOSE_MSG_TYPE, '', chanId);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||||
|
|
||||||
|
/* Once the management process is connected to orbit, send it a
|
||||||
|
* payload message to create the channel ID with the payload
|
||||||
|
* message server. */
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
const overlay = { browserAccess: { enabled: true } };
|
||||||
|
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, overlay);
|
||||||
|
setTimeout(() => {
|
||||||
|
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, 'data', chanId);
|
||||||
|
}, 1000);
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
/* When the process management receives a payload message associated to
|
||||||
|
* a specific channel ID from Orbit, it forwards this message to a
|
||||||
|
* configured host/port socket.
|
||||||
|
* When it received data on this socket, it forwards it to Orbit. */
|
||||||
|
it('should forward payload message', done => {
|
||||||
|
const payload = 'payload';
|
||||||
|
const chanId = 1;
|
||||||
|
let socket = null;
|
||||||
|
|
||||||
|
/* Payload message socket server. The management process forwards
|
||||||
|
* payload message from Orbit. */
|
||||||
|
const server = new net.Server();
|
||||||
|
server.on('connection', s => {
|
||||||
|
socket = s;
|
||||||
|
|
||||||
|
/* After receiving the payload message sent by Orbit and
|
||||||
|
* forwarded by the management process, reply on the socket. */
|
||||||
|
socket.on('data', data => {
|
||||||
|
assert.strictEqual(data.toString(), payload);
|
||||||
|
|
||||||
|
socket.write(payload);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
/* Once the management process is connected to orbit, send it a
|
||||||
|
* payload message. */
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload, chanId);
|
||||||
|
});
|
||||||
|
|
||||||
|
/* Check orbit received the message. */
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
|
||||||
|
const message = new ChannelMessageV0(data);
|
||||||
|
|
||||||
|
assert.strictEqual(message.getType(), PAYLOAD_MSG_TYPE);
|
||||||
|
assert.strictEqual(message.payload.toString(), payload);
|
||||||
|
assert.strictEqual(message.getChannelNumber(), chanId);
|
||||||
|
|
||||||
|
socket.end();
|
||||||
|
server.close(() => { orbitSimulator.stop(done); });
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should use one TCP socket per channel ID', done => {
|
||||||
|
const payload = 'payload';
|
||||||
|
const chanId1 = 1;
|
||||||
|
const chanId2 = 2;
|
||||||
|
const evtEmitter = new EventEmitter();
|
||||||
|
const clients = [];
|
||||||
|
const payloadMsgFwdedEvent = 'payloadMessageForwarded';
|
||||||
|
let socketServerFirstMsgReceived = false;
|
||||||
|
let orbitFirstMsgReceived = false;
|
||||||
|
|
||||||
|
/* Payload message socket server. The management process forwards
|
||||||
|
* payload message from Orbit. */
|
||||||
|
const server = new net.Server();
|
||||||
|
server.on('connection', s => {
|
||||||
|
assert.equal(clients.indexOf(s), -1);
|
||||||
|
|
||||||
|
clients.push(s);
|
||||||
|
s.on('data', () => { evtEmitter.emit(payloadMsgFwdedEvent); });
|
||||||
|
});
|
||||||
|
|
||||||
|
/* Once the management process is connected to orbit, send it a
|
||||||
|
* payload message. */
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload, chanId1);
|
||||||
|
});
|
||||||
|
|
||||||
|
/* When the socket server receives the 1st payload, send an other
|
||||||
|
* one with a different channel id. In both cases, reply with a
|
||||||
|
* payload. */
|
||||||
|
evtEmitter.on(payloadMsgFwdedEvent, () => {
|
||||||
|
if (!socketServerFirstMsgReceived) {
|
||||||
|
assert.equal(clients.length, 1);
|
||||||
|
|
||||||
|
socketServerFirstMsgReceived = true;
|
||||||
|
clients[0].write(payload);
|
||||||
|
orbitSimulator.sendMessage(PAYLOAD_MSG_TYPE, payload,
|
||||||
|
chanId2);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.equal(clients.length, 2);
|
||||||
|
clients[1].write(payload);
|
||||||
|
});
|
||||||
|
|
||||||
|
/* Check orbit received the 2 messages. */
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
|
||||||
|
const message = new ChannelMessageV0(data);
|
||||||
|
|
||||||
|
if (!orbitFirstMsgReceived) {
|
||||||
|
assert.strictEqual(message.getType(), PAYLOAD_MSG_TYPE);
|
||||||
|
assert.strictEqual(message.payload.toString(), payload);
|
||||||
|
assert.strictEqual(message.getChannelNumber(), chanId1);
|
||||||
|
|
||||||
|
orbitFirstMsgReceived = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Ignore CHANNEL_CLOSE_MESSAGE */
|
||||||
|
if (message.getType() !== PAYLOAD_MSG_TYPE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert.strictEqual(message.payload.toString(), payload);
|
||||||
|
assert.strictEqual(message.getChannelNumber(), chanId2);
|
||||||
|
|
||||||
|
clients.forEach(client => { client.end(); });
|
||||||
|
server.close(() => { orbitSimulator.stop(done); });
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT);
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Test the management process.
|
||||||
|
*
|
||||||
|
* This process connects to Orbit WebSocket server and is itself a WebSocket
|
||||||
|
* server. Testing it requires:
|
||||||
|
* - an orbit simulator,
|
||||||
|
* - a WebSocket client to connect to it,
|
||||||
|
* - HTTP server to answer the stats requests,
|
||||||
|
* - a socket server for payload messages.
|
||||||
|
**/
|
||||||
|
describe('Management process', function testSuite() {
|
||||||
|
this.timeout(120000);
|
||||||
|
|
||||||
|
/* Make sure the process management send payload message to a local host
|
||||||
|
* socket to be able to receive this message in this test. */
|
||||||
|
assert.strictEqual(process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST,
|
||||||
|
'localhost');
|
||||||
|
|
||||||
|
function createWs(path) {
|
||||||
|
const host = _config.managementAgent.host;
|
||||||
|
const port = _config.managementAgent.port;
|
||||||
|
const url = `ws://${host}:${port}/${path || 'watch'}`;
|
||||||
|
return new WebSocket(url);
|
||||||
|
}
|
||||||
|
|
||||||
|
it('should not listen on others routes than `watch`', done => {
|
||||||
|
const ws = createWs('wrong_path');
|
||||||
|
const msg = 'management agent process should not listen this route';
|
||||||
|
|
||||||
|
ws.on('open', () => { done(new Error(msg)); });
|
||||||
|
ws.on('unexpected-response', (_, response) => {
|
||||||
|
assert.strictEqual(response.statusCode, 400);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should listen on `watch` route', done => {
|
||||||
|
const ws = createWs();
|
||||||
|
|
||||||
|
ws.on('open', done);
|
||||||
|
ws.on('error', error => { done(error); });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should terminate the connection when a client does not answer ping',
|
||||||
|
done => {
|
||||||
|
this.timeout(2 * CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
|
||||||
|
|
||||||
|
const ws = createWs();
|
||||||
|
|
||||||
|
ws.on('close', (code, reason) => {
|
||||||
|
assert.strictEqual(code, WS_STATUS_IDLE.code);
|
||||||
|
assert.strictEqual(reason, WS_STATUS_IDLE.reason);
|
||||||
|
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
|
||||||
|
ws.on('error', error => { done(error); });
|
||||||
|
|
||||||
|
ws.on('message', () => {
|
||||||
|
/* Ugly eventTarget internal fields hacking to avoid this web
|
||||||
|
* socket to answer to ping messages. It will make
|
||||||
|
* the management agent to close the connection after a timeout.
|
||||||
|
* Defining an onPing event does not help, this internal
|
||||||
|
* function is still called. */
|
||||||
|
ws._receiver._events.ping = function noop() {};
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should connect to orbit', done => {
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
orbitSimulator.stop(done);
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should save the last overlay and send it to client on connection',
|
||||||
|
done => {
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
/* Send an overlay to management process. */
|
||||||
|
const body = 'body';
|
||||||
|
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE, body);
|
||||||
|
|
||||||
|
/* Connect to the process manager and check its saved
|
||||||
|
* overlay. */
|
||||||
|
const ws = createWs();
|
||||||
|
ws.on('error', error => { done(new Error(error)); });
|
||||||
|
ws.on('message', data => {
|
||||||
|
const msg = JSON.parse(data);
|
||||||
|
const type = managementAgentMessageType.NEW_OVERLAY;
|
||||||
|
|
||||||
|
assert.strictEqual(msg.messageType, type);
|
||||||
|
assert.strictEqual(msg.payload.toString(), body);
|
||||||
|
|
||||||
|
ws.terminate();
|
||||||
|
orbitSimulator.stop(done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send new overlay to its client', done => {
|
||||||
|
const body = 'body';
|
||||||
|
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
/* Connect to management process to receive the first overlay.
|
||||||
|
**/
|
||||||
|
const ws = createWs();
|
||||||
|
let firstMsgReceived = false;
|
||||||
|
|
||||||
|
ws.on('error', () => { done(new Error('connection error')); });
|
||||||
|
ws.on('message', data => {
|
||||||
|
const msg = JSON.parse(data);
|
||||||
|
|
||||||
|
if (!firstMsgReceived) {
|
||||||
|
firstMsgReceived = true;
|
||||||
|
/* Send a new overlay to management process. */
|
||||||
|
orbitSimulator.sendMessage(CONFIG_OVERLAY_MSG_TYPE,
|
||||||
|
body);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check we receive the second overlay. */
|
||||||
|
assert.strictEqual(msg.payload.toString(), body);
|
||||||
|
|
||||||
|
ws.terminate();
|
||||||
|
orbitSimulator.stop(done);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should get and send stats to orbit on stat requests', done => {
|
||||||
|
const stats = { stats: 'stats' };
|
||||||
|
|
||||||
|
/* Mock stats server, it returns a JSON object stringified on
|
||||||
|
* request on the stats path. */
|
||||||
|
const url = URL.parse(process.env.STAT_REPORT_URL);
|
||||||
|
const statServer = http.createServer((request, response) => {
|
||||||
|
if (request.url !== url.pathname) {
|
||||||
|
response.writeHead(400, { 'Content-type': 'text/plan' });
|
||||||
|
response.write('bad path');
|
||||||
|
response.end();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
response.writeHead(200, { 'Content-type': 'text/plan' });
|
||||||
|
response.write(JSON.stringify(stats));
|
||||||
|
response.end();
|
||||||
|
});
|
||||||
|
statServer.listen(url.port);
|
||||||
|
|
||||||
|
/* Once the management process is connected to orbit, send it a
|
||||||
|
* stat request. */
|
||||||
|
const orbitSimulator = new OrbitSimulatorServer();
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_CONNECTION, () => {
|
||||||
|
orbitSimulator.sendMessage(METRICS_REQ_MSG_TYPE, 'data');
|
||||||
|
});
|
||||||
|
|
||||||
|
/* And finally check the management process replies by a metrics
|
||||||
|
* report message. */
|
||||||
|
orbitSimulator.on(OrbitSimulatorEvent.EVENT_MESSAGE, data => {
|
||||||
|
const message = new ChannelMessageV0(data);
|
||||||
|
|
||||||
|
assert.strictEqual(message.getType(), METRICS_REPORT_MSG_TYPE);
|
||||||
|
assert.deepStrictEqual(JSON.parse(message.payload.toString()),
|
||||||
|
stats);
|
||||||
|
|
||||||
|
statServer.close(() => { orbitSimulator.stop(done); });
|
||||||
|
});
|
||||||
|
orbitSimulator.start();
|
||||||
|
});
|
||||||
|
|
||||||
|
/* Test suite which requires the management process to have received an
|
||||||
|
* overlay with the browserAccessEnabled set to true, otherwise payload
|
||||||
|
* message are not forwarded. This test suite is separated from the
|
||||||
|
* other tests because of its internal requirement. As long as there is
|
||||||
|
* no tests launching/stoping the management process there is no
|
||||||
|
* better way to test this. Anonymous function not used here to save an
|
||||||
|
* indentation level. */
|
||||||
|
describe('Management process with browser access enabled',
|
||||||
|
testWithBrowserAccessEnabled);
|
||||||
|
});
|
|
@ -0,0 +1,6 @@
|
||||||
|
{
|
||||||
|
"tests": {
|
||||||
|
"files": [ "/test" ],
|
||||||
|
"on": "aggressor"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,126 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const { EventEmitter } = require('events');
|
||||||
|
const url = require('url');
|
||||||
|
const WebSocket = require('ws');
|
||||||
|
|
||||||
|
const logger = require('../../../lib/utilities/logger');
|
||||||
|
const OrbitSimulatorEvent = {
|
||||||
|
EVENT_LISTENING: 'listening',
|
||||||
|
EVENT_CONNECTION: 'connection',
|
||||||
|
EVENT_MESSAGE: 'message',
|
||||||
|
};
|
||||||
|
const {
|
||||||
|
ChannelMessageV0,
|
||||||
|
MessageType,
|
||||||
|
} = require('../../../lib/management/ChannelMessageV0');
|
||||||
|
const {
|
||||||
|
pushEndpointUrlFromInstanceId,
|
||||||
|
} = require('../../../lib/management/push');
|
||||||
|
|
||||||
|
|
||||||
|
class OrbitSimulatorServer extends EventEmitter {
|
||||||
|
constructor() {
|
||||||
|
super();
|
||||||
|
|
||||||
|
assert(process.env.INITIAL_INSTANCE_ID,
|
||||||
|
'INITIAL_INSTANCE_ID env variable is required');
|
||||||
|
const instanceId = process.env.INITIAL_INSTANCE_ID;
|
||||||
|
|
||||||
|
const endpointUrl = pushEndpointUrlFromInstanceId(instanceId);
|
||||||
|
const endpointUrlObj = url.parse(endpointUrl);
|
||||||
|
|
||||||
|
assert(endpointUrlObj.host, 'localhost');
|
||||||
|
|
||||||
|
this.wss = null;
|
||||||
|
this.port = endpointUrlObj.port;
|
||||||
|
this.path = endpointUrlObj.path;
|
||||||
|
|
||||||
|
this.stop = this.stop.bind(this);
|
||||||
|
process.on('SIGINT', this.stop);
|
||||||
|
process.on('SIGHUP', this.stop);
|
||||||
|
process.on('SIGQUIT', this.stop);
|
||||||
|
process.on('SIGTERM', this.stop);
|
||||||
|
process.on('SIGPIPE', () => {});
|
||||||
|
}
|
||||||
|
|
||||||
|
start() {
|
||||||
|
this.wss = new WebSocket.Server({
|
||||||
|
port: this.port,
|
||||||
|
path: this.path,
|
||||||
|
clientTracking: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.wss.on('connection', clientWs => {
|
||||||
|
clientWs.on('message', data => {
|
||||||
|
this.emit(OrbitSimulatorEvent.EVENT_MESSAGE, data);
|
||||||
|
});
|
||||||
|
this.emit(OrbitSimulatorEvent.EVENT_CONNECTION);
|
||||||
|
});
|
||||||
|
this.wss.on('listening', () => {
|
||||||
|
this.emit(OrbitSimulatorEvent.EVENT_LISTENING);
|
||||||
|
});
|
||||||
|
this.wss.on('error', error => {
|
||||||
|
logger.error('orbit CI simulator error', { error });
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
stop(cb) {
|
||||||
|
if (!this.wss) {
|
||||||
|
if (cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
this.wss.close(() => {
|
||||||
|
if (cb) {
|
||||||
|
cb();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
sendMessage(messageType, body, channelId) {
|
||||||
|
let message = null;
|
||||||
|
|
||||||
|
switch (messageType) {
|
||||||
|
case MessageType.CONFIG_OVERLAY_MESSAGE:
|
||||||
|
assert(body);
|
||||||
|
message = ChannelMessageV0.encodeConfigOverlayMessage(body);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MessageType.METRICS_REQUEST_MESSAGE:
|
||||||
|
assert(body);
|
||||||
|
message = ChannelMessageV0.encodeMetricsRequestMessage(body);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MessageType.METRICS_REPORT_MESSAGE:
|
||||||
|
assert(body);
|
||||||
|
message = ChannelMessageV0.encodeMetricsReportMessage(body);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MessageType.CHANNEL_CLOSE_MESSAGE:
|
||||||
|
assert(channelId);
|
||||||
|
message = ChannelMessageV0.encodeChannelCloseMessage(channelId);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case MessageType.CHANNEL_PAYLOAD_MESSAGE:
|
||||||
|
assert(body);
|
||||||
|
assert(channelId);
|
||||||
|
message = ChannelMessageV0.encodeChannelDataMessage(
|
||||||
|
channelId,
|
||||||
|
Buffer.from(body)
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
logger.error('unsupported message type', { messageType });
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.wss.clients.forEach(client => { client.send(message); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
OrbitSimulatorServer,
|
||||||
|
OrbitSimulatorEvent,
|
||||||
|
};
|
|
@ -0,0 +1,6 @@
|
||||||
|
{
|
||||||
|
"name": "management-agent-tests",
|
||||||
|
"scripts": {
|
||||||
|
"test": "mocha -t 40000 *.js"
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue