Compare commits

...

19 Commits

Author SHA1 Message Date
jeremyds f8d4a533ff tempo 2018-08-30 15:15:54 -07:00
jeremyds 1f6483e0ee tempo 2018-08-30 11:52:23 -07:00
jeremyds 3ce765c2dc move metadata setup from managementAgent.js to management/index.js::initManagement 2018-08-29 15:05:03 -07:00
jeremyds 33d94fc375 tests 2018-08-28 16:15:59 -07:00
jeremyds 486459749c ft: ZENKO-716: stop using management code in S3 2018-08-28 16:15:34 -07:00
jeremyds 9d27b34209 ft: ZENKO-714: management agent client in S3.
Now there is a new process handling the management feature we should add
a websocket client in S3 to receive the overlay. To safely move from the
management code in S3 to the full use of the new management process,
this code is run only if the env variable MANAGEGEMENT_USE_AGENT is
defined and equal to one.
2018-08-22 14:01:07 -07:00
jeremyds 4ecb9b6717 ft: ZENKO-714: export applyAndSaveOverlay
This function will be needed by the management agent client in S3.
Export it and take the opportunity to move it to configuration.js.
2018-08-22 14:01:07 -07:00
jeremyds 5c51b5aa19 ft: ZENKO-713: management agent websocket server
This commit creates a websocket server which adds a '/watch' route. The
management agent registers a listener to the new configuration event and
then sends it to its client.
2018-08-22 14:01:07 -07:00
jeremyds 2575134e58 ft: ZENKO-713: overlay event listener
The current management code runs in S3 patches and saves received
overlay. The new management process, which will manage all the
management feature, won't patch and save the new overlay, it is a S3
specific task.

This commit adds a new overlay event listerner from the management
push code (where the new overlay is received). It will allow the
management process to send to its client the new overlay.

A new env varialble is added in this commit. It is temporary used to
remove safely the management feature from the S3 process, keeping the
previous behavior in a first time.
2018-08-22 14:01:07 -07:00
jeremyds ff79b630da ft: ZENKO-713: management agent message type definition.
Define a new message type for the futur communication between the
management agent and its client.
2018-08-22 14:01:07 -07:00
jeremyds a8bcfda3e4 ft: ZENKO-713: fix management::SaveconfigurationVersion error handling 2018-08-22 14:01:07 -07:00
jeremyds 3c4a6b8d95 ft: ZENKO-713: add callback to initManagement
The future management agent process will init the management before to
launch a websocket server. This commit adds an optional callback to the
initManagement function, call once the init is done, with the loaded
overlay as parameter.
2018-08-22 14:01:07 -07:00
jeremyds 6fd3c9f264 ft: ZENKO-713: log msg improvement 2018-08-22 14:01:07 -07:00
jeremyds a9592e7ac3 ft: ZENKO-712: linter errors. 2018-08-22 14:01:07 -07:00
jeremyds d9659627cc ft: ZENKO-712: remove unused configuration item. 2018-08-22 14:01:07 -07:00
jeremyds c1095d2246 ft: ZENKO-712: new process to handle management. 2018-08-22 14:01:07 -07:00
jeremyds 1fbfe90951 ft: ZENKO-712: management push stat, add an error handling
The error was triggered with the new process when shutting down the S3
server, it leaded to an exception.
2018-08-22 14:01:07 -07:00
jeremyds b9d4371ff3 ft: ZENKO-712: handle net error event for management.
Log an error instead of crashing when the management push code fails to
connect to S3.
2018-08-22 14:01:07 -07:00
Thomas Carmet 1234e55622 bumping cloudserver version to 8.1 2018-08-22 14:01:07 -07:00
15 changed files with 513 additions and 52 deletions

View File

@ -227,6 +227,33 @@ stages:
<<: *global-env
- Upload: *upload-artifacts
management-agent-tests:
worker: &s3-pod
type: kube_pod
path: eve/workers/pod.yaml
images:
aggressor: eve/workers/build
s3: "."
vars:
aggressorMemLimit: "2Gi"
s3MemLimit: "1664Mi"
steps:
- Git: *clone
- ShellCommand: *credentials
- ShellCommand: *npm-install
- ShellCommand:
command: |
set -ex
bash wait_for_local_port.bash 8000 40
npm run ft_test
<<: *follow-s3-log
env:
<<: *mongo-vars
<<: *global-env
- Upload: *upload-artifacts
file-ft-tests:
worker:
type: kube_pod

View File

@ -908,6 +908,25 @@ class Config extends EventEmitter {
this.outboundProxy.certs = certObj.certs;
}
this.managementAgent = {};
this.managementAgent.port = 8010;
this.managementAgent.host = 'localhost';
if (config.managementAgent !== undefined) {
if (config.managementAgent.port !== undefined) {
assert(Number.isInteger(config.managementAgent.port)
&& config.managementAgent.port > 0,
'bad config: managementAgent port must be a positive ' +
'integer');
this.managementAgent.port = config.managementAgent.port;
}
if (config.managementAgent.host !== undefined) {
assert.strictEqual(typeof config.managementAgent.host, 'string',
'bad config: management agent host must ' +
'be a string');
this.managementAgent.host = config.managementAgent.host;
}
}
// Ephemeral token to protect the reporting endpoint:
// try inherited from parent first, then hardcoded in conf file,
// then create a fresh one as last resort.
@ -915,7 +934,6 @@ class Config extends EventEmitter {
process.env.REPORT_TOKEN ||
config.reportToken ||
uuid.v4().toString();
this.reportEndpoint = process.env.REPORT_ENDPOINT;
}
_configureBackends() {

View File

@ -0,0 +1,63 @@
const WebSocket = require('ws');
const logger = require('../utilities/logger');
const _config = require('../Config').config;
const { applyAndSaveOverlay } = require('./configuration');
const managementAgentMessageType = {
/** Message that contains the loaded overlay */
NEW_OVERLAY: 1,
};
const CONNECTION_RETRY_TIMEOUT_MS = 5000;
function initManagementClient() {
const host = _config.managementAgent.host;
const port = _config.managementAgent.port;
const ws = new WebSocket(`ws://${host}:${port}/watch`);
ws.on('open', () => {
logger.info('connected with management agent');
});
ws.on('close', (code, reason) => {
logger.info('disconnected from management agent', { reason });
setTimeout(initManagementClient, CONNECTION_RETRY_TIMEOUT_MS);
});
ws.on('error', error => {
logger.error('error on connection with management agent', { error });
});
ws.on('message', data => {
const log = logger.newRequestLogger();
const msg = JSON.parse(data);
if (msg.payload === undefined) {
log.error('message without payload');
return;
}
if (typeof msg.messageType !== 'number') {
log.error('messageType is not an integer', {
type: typeof msg.messageType,
});
return;
}
switch (msg.messageType) {
case managementAgentMessageType.NEW_OVERLAY:
applyAndSaveOverlay(msg.payload, log);
break;
default:
log.error('new overlay message version without payload');
return;
}
});
}
module.exports = {
managementAgentMessageType,
initManagementClient,
};

View File

@ -256,12 +256,21 @@ function saveConfigurationVersion(cachedOverlay, remoteOverlay, log, cb) {
metadata.putObjectMD(managementDatabaseName, objName, remoteOverlay,
{}, log, error => {
if (error) {
log.error('could not save configuration version',
log.error('could not save configuration',
{ configurationVersion: remoteOverlay.version });
cb(error);
return;
}
metadata.putObjectMD(managementDatabaseName,
latestOverlayVersionKey, remoteOverlay.version, {}, log,
error => cb(error, remoteOverlay));
error => {
if (error) {
log.error('could not save configuration version', {
configurationVersion: remoteOverlay.version,
});
}
cb(error, remoteOverlay);
});
});
} else {
log.debug('no remote configuration to cache yet');
@ -300,10 +309,31 @@ function loadCachedOverlay(log, callback) {
});
}
function applyAndSaveOverlay(overlay, log) {
patchConfiguration(overlay, log, err => {
if (err) {
log.error('could not apply pushed overlay', {
error: err,
});
return;
}
saveConfigurationVersion(null, overlay, log, err => {
if (err) {
log.error('could not cache overlay version', {
error: err,
});
return;
}
log.info('overlay push processed');
});
});
}
module.exports = {
loadCachedOverlay,
managementDatabaseName,
patchConfiguration,
saveConfigurationVersion,
remoteOverlayIsNewer,
applyAndSaveOverlay,
};

View File

@ -0,0 +1,12 @@
/* From RFC6455 which defines some reserved status code ranges, the range
* 4000-4999 are for private use. */
const WS_STATUS_IDDLE = {
code: 4000,
reason: 'does not reply to ping before timeout',
};
const CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS = 15000;
module.exports = {
WS_STATUS_IDDLE,
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
};

View File

@ -65,29 +65,34 @@ function startManagementListeners(instanceId, token) {
*
* @param {werelogs~Logger} log Request-scoped logger to be able to trace
* initialization process
* @param {function} callback Function to call once the overlay is loaded
* (overlay)
*
* @returns {undefined}
*/
function initManagement(log) {
function initManagement(log, callback) {
if ((process.env.REMOTE_MANAGEMENT_DISABLE &&
process.env.REMOTE_MANAGEMENT_DISABLE !== '0')
|| process.env.S3BACKEND === 'mem') {
log.info('remote management disabled');
return;
}
async.waterfall([
cb => metadata.setup(cb),
cb => initManagementDatabase(log, cb),
cb => metadata.getUUID(log, cb),
(instanceId, cb) => initManagementCredentials(
managementEndpoint, instanceId, log, cb),
(instanceId, token, cb) => loadCachedOverlay(log, (err, conf) => {
if (err) {
return cb(err);
}
return patchConfiguration(conf, log,
err => cb(err, instanceId, token));
}),
], (error, instanceId, token) => {
(instanceId, token, cb) => {
loadCachedOverlay(log, (err, overlay) => cb(err, instanceId,
token, overlay));
},
(instanceId, token, overlay, cb) => {
patchConfiguration(overlay, log,
err => cb(err, instanceId, token, overlay));
},
], (error, instanceId, token, overlay) => {
if (error) {
log.error('could not initialize remote management, retrying later',
{ error });
@ -98,6 +103,9 @@ function initManagement(log) {
log.info(`this deployment's Instance ID is ${instanceId}`);
log.end('management init done');
startManagementListeners(instanceId, token);
if (callback) {
callback(overlay);
}
}
});
}

View File

@ -3,15 +3,12 @@ const net = require('net');
const request = require('request');
const { URL } = require('url');
const WebSocket = require('ws');
const assert = require('assert');
const _config = require('../Config').config;
const logger = require('../utilities/logger');
const metadata = require('../metadata/wrapper');
const {
patchConfiguration,
saveConfigurationVersion,
} = require('./configuration');
const { applyAndSaveOverlay } = require('./configuration');
const {
ChannelMessageV0,
@ -28,6 +25,8 @@ const {
const PING_INTERVAL_MS = 10000;
const subprotocols = [ChannelMessageV0.protocolName];
let overlayMessageListener = null;
// No wildcard nor cidr/mask match for now
function createWSAgent(pushEndpoint, env, log) {
const url = new URL(pushEndpoint);
@ -117,6 +116,10 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
},
};
request(fromURL, fromOptions, (err, response, body) => {
if (err) {
logger.error('failed to push stats', { err });
return;
}
ws.send(ChannelMessageV0.encodeMetricsReportMessage(body),
_logError);
}).json();
@ -133,10 +136,11 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
function receiveChannelData(channelId, payload) {
let socket = socketsByChannelId[channelId];
if (!socket) {
socket = net.createConnection({
host: 'localhost',
port: _config.port,
});
const host = process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_HOST
|| 'localhost';
const port = process.env.SECURE_CHANNEL_DEFAULT_FORWARD_TO_PORT
|| _config.port;
socket = net.createConnection(port, host);
socket.on('data', data => {
ws.send(ChannelMessageV0.
@ -149,6 +153,14 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
socket.on('drain', () => {
});
socket.on('error', error => {
logger.error('failed to connect to S3', {
code: error.code,
host: error.address,
port: error.port,
});
});
socket.on('end', () => {
socket.destroy();
socketsByChannelId[channelId] = null;
@ -161,26 +173,6 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
socket.write(payload);
}
function applyAndSaveOverlay(overlay, log) {
patchConfiguration(overlay, log, err => {
if (err) {
log.error('could not apply pushed overlay', {
error: err,
});
return;
}
saveConfigurationVersion(null, overlay, log, err => {
if (err) {
log.error('could not cache overlay version', {
error: err,
});
return;
}
log.info('overlay push processed');
});
});
}
function browserAccessChangeHandler() {
if (!_config.browserAccessEnabled) {
socketsByChannelId.forEach(s => s.close());
@ -228,7 +220,9 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
switch (message.getType()) {
case CONFIG_OVERLAY_MESSAGE:
applyAndSaveOverlay(JSON.parse(message.getPayload()), log);
if (overlayMessageListener) {
overlayMessageListener(message.getPayload());
}
break;
case METRICS_REQUEST_MESSAGE:
pushStats();
@ -249,7 +243,13 @@ function startWSManagementClient(pushEndpoint, instanceId, token) {
});
}
function addOverlayMessageListener(callback) {
assert(typeof callback === 'function');
overlayMessageListener = callback;
}
module.exports = {
createWSAgent,
startWSManagementClient,
addOverlayMessageListener,
};

View File

@ -14,7 +14,7 @@ const { blacklistedPrefixes } = require('../constants');
const api = require('./api/api');
const data = require('./data/wrapper');
const metadata = require('./metadata/wrapper');
const { initManagement } = require('./management');
const { initManagementClient } = require('./management/agentClient');
const routes = arsenal.s3routes.routes;
const websiteEndpoints = _config.websiteEndpoints;
@ -153,11 +153,7 @@ class S3Server {
// TODO this should wait for metadata healthcheck to be ok
// TODO only do this in cluster master
setTimeout(() => {
if (enableRemoteManagement) {
initManagement(logger.newRequestLogger());
}
}, 5000);
initManagementClient();
}
/*

165
managementAgent.js Normal file
View File

@ -0,0 +1,165 @@
const Uuid = require('uuid');
const WebSocket = require('ws');
const logger = require('./lib/utilities/logger');
const { initManagement } = require('./lib/management');
const _config = require('./lib/Config').config;
const { managementAgentMessageType } = require('./lib/management/agentClient');
const { addOverlayMessageListener } = require('./lib/management/push');
const {
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
WS_STATUS_IDDLE,
} = require('./lib/management/constants');
// TODO: auth?
class ManagementAgentServer {
constructor() {
this.port = _config.managementAgent.port || 8010;
this.wss = null;
this.loadedOverlay = null;
this.stop = this.stop.bind(this);
process.on('SIGINT', this.stop);
process.on('SIGHUP', this.stop);
process.on('SIGQUIT', this.stop);
process.on('SIGTERM', this.stop);
process.on('SIGPIPE', () => {});
}
start(_cb) {
const cb = _cb || function noop() {};
/* Define REPORT_TOKEN env variable needed by the management
* module. */
process.env.REPORT_TOKEN = process.env.REPORT_TOKEN
|| _config.reportToken
|| Uuid.v4();
/* The initManegement function retries when it fails. */
return initManagement(logger.newRequestLogger(), overlay => {
let error = null;
if (overlay) {
this.loadedOverlay = overlay;
this.startServer();
} else {
error = new Error('failed to init management');
}
return cb(error);
});
}
stop() {
if (this.wss) {
this.wss.close(() => {
logger.info('server shutdown');
});
}
}
startServer() {
this.wss = new WebSocket.Server({
port: this.port,
clientTracking: true,
path: '/watch',
});
this.wss.on('connection', this.onConnection.bind(this));
this.wss.on('listening', this.onListening.bind(this));
this.wss.on('error', this.onError.bind(this));
setInterval(this.checkBrokenConnections.bind(this),
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
addOverlayMessageListener(this.onNewOverlay.bind(this));
}
onConnection(socket, request) {
function hearthbeat() {
this.isAlive = true;
}
logger.info('client connected to watch route', {
ip: request.connection.remoteAddress,
});
/* eslint-disable no-param-reassign */
socket.isAlive = true;
socket.on('pong', hearthbeat.bind(socket));
if (socket.readyState !== socket.OPEN) {
logger.error('client socket not in ready state', {
state: socket.readyState,
client: socket._socket._peername,
});
return;
}
const msg = {
messageType: managementAgentMessageType.NEW_OVERLAY,
payload: this.loadedOverlay,
};
socket.send(JSON.stringify(msg), error => {
if (error) {
logger.error('failed to send remoteOverlay to client', {
error,
client: socket._socket._peername,
});
}
});
}
onListening() {
logger.info('websocket server listening',
{ port: this.port });
}
onError(error) {
logger.error('websocket server error', { error });
}
onNewOverlay(remoteOverlay) {
this.loadedOverlay = remoteOverlay;
this.wss.clients.forEach(client => {
if (client.readyState !== client.OPEN) {
logger.error('client socket not in ready state', {
state: client.readyState,
client: client._socket._peername,
});
return;
}
const msg = {
messageType: managementAgentMessageType.NEW_OVERLAY,
payload: remoteOverlay,
};
client.send(JSON.stringify(msg), error => {
if (error) {
logger.error('failed to send remoteOverlay to management' +
' agent client', {
error,
client: client._socket._peername,
});
}
});
});
}
checkBrokenConnections() {
this.wss.clients.forEach(client => {
if (!client.isAlive) {
logger.info('close broken connection', {
client: client._socket._peername,
});
client.close(WS_STATUS_IDDLE.code, WS_STATUS_IDDLE.reason);
return;
}
client.isAlive = false;
client.ping();
});
}
}
const server = new ManagementAgentServer();
server.start();

View File

@ -1,6 +1,6 @@
{
"name": "@zenko/cloudserver",
"version": "8.0.0-beta",
"version": "8.1.0-beta",
"description": "Zenko CloudServer, an open-source Node.js implementation of a server handling the Amazon S3 protocol",
"main": "index.js",
"engines": {
@ -84,6 +84,7 @@
"ft_util": "cd tests/functional/utilities && mocha -t 40000 *.js",
"ft_test": "npm-run-all -s ft_awssdk ft_s3cmd ft_s3curl ft_node ft_healthchecks ft_management ft_util",
"ft_search": "cd tests/functional/aws-node-sdk && mocha -t 90000 test/mdSearch",
"ft_management_agent": "cd tests/functional/managementAgent/ && npm test",
"install_ft_deps": "npm install aws-sdk@2.28.0 bluebird@3.3.1 mocha@2.3.4 mocha-junit-reporter@1.11.1 tv4@1.2.7",
"lint": "eslint $(git ls-files '*.js')",
"lint_md": "mdlint $(git ls-files '*.md')",
@ -97,6 +98,7 @@
"start_dmd": "npm-run-all --parallel start_mdserver start_dataserver",
"start_utapi": "node lib/utapi/utapi.js",
"utapi_replay": "node lib/utapi/utapiReplay.js",
"management_agent": "node managementAgent.js",
"test": "CI=true S3BACKEND=mem mocha --recursive tests/unit",
"test_legacy_location": "CI=true S3_LOCATION_FILE=tests/locationConfig/locationConfigLegacy.json S3BACKEND=mem mocha --recursive tests/unit",
"multiple_backend_test": "CI=true S3BACKEND=mem S3DATA=multiple mocha -t 20000 --recursive tests/multipleBackend",

View File

@ -0,0 +1,2 @@
[default]
region = us-east-1

View File

@ -0,0 +1,3 @@
[default]
aws_access_key_id = Y1VC0G0ABJN0R6CY04N2
aws_secret_access_key = e8yosUHlaY46jXPDVKsAttw5StKR0HRtGBePPGBW

View File

@ -0,0 +1,123 @@
'use strict'; // eslint-disable-line strict
const WebSocket = require('ws');
const assert = require('assert');
const logger = require('../../../lib/utilities/logger');
const _config = require('../../../lib/Config').config;
const {
managementAgentMessageType,
} = require('../../../lib/management/agentClient');
const {
patchConfiguration,
} = require('../../../lib/management/configuration');
const {
CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS,
WS_STATUS_IDDLE,
} = require('../../../lib/management/constants');
const metadata = require('../../../lib/metadata/wrapper.js');
function createWs(path) {
const host = _config.managementAgent.host;
const port = _config.managementAgent.port;
return new WebSocket(`ws://${host}:${port}/${path || 'watch'}`);
}
describe('Management process', function testSuite() {
this.timeout(120000);
it('should setup metada', done => {
metadata.setup(done);
});
it('should not listen on others routes than `watch`', done => {
const ws = createWs('wrong_path');
const msg = 'management agent process should not listen this route';
ws.on('open', () => {
logger.error('open');
return done(new Error(msg));
});
ws.on('error', error => {
logger.error('error', { error });
return done();
});
});
it('should listen on `watch` route', done => {
const ws = createWs();
const msg = 'management agent process should listen this route';
ws.on('open', done);
ws.on('error', () => { done(new Error(msg)); });
});
it('should send the loaded overlay as first message', done => {
let firstMsgReceived = false;
const ws = createWs();
ws.on('close', done);
ws.on('error', () => { done(new Error('connection error')); });
ws.on('message', data => {
if (!firstMsgReceived) {
firstMsgReceived = true;
} else {
return;
}
const msg = JSON.parse(data);
assert.strictEqual(
msg.messageType, managementAgentMessageType.NEW_OVERLAY
);
assert(msg.payload);
patchConfiguration(msg.payload, logger.newRequestLogger(), done);
});
});
it('should send the new overlay after the first message', done => {
const ws = createWs();
let firstMsgReceived = false;
ws.on('close', done);
ws.on('error', () => { done(new Error('connection error')); });
ws.on('message', data => {
if (!firstMsgReceived) {
firstMsgReceived = true;
return;
}
const msg = JSON.parse(data);
assert.strictEqual(
msg.messageType, managementAgentMessageType.NEW_OVERLAY
);
assert(msg.payload);
patchConfiguration(msg.payload, logger.newRequestLogger(), done);
});
});
it('should terminate the connection when a client does not answer ping',
done => {
this.timeout(2 * CHECK_BROKEN_CONNECTIONS_FREQUENCY_MS);
const ws = createWs();
ws.on('close', (code, reason) => {
assert.strictEqual(code, WS_STATUS_IDDLE.code);
assert.strictEqual(reason, WS_STATUS_IDDLE.reason);
done();
});
ws.on('error', error => {
done(new Error('unexpected event'));
});
ws.on('message', data => {
/* Ugly eventTarget internal fields hacking to avoid this web
* socket to answer to ping messages. It will make the management
* agent to close the connection after a timeout.
* Defining an onPing event does not help, this internal function
* is still called. */
ws._receiver._events.ping = function noop() {};
});
});
});

View File

@ -0,0 +1,6 @@
{
"tests": {
"files": [ "/test" ],
"on": "aggressor"
}
}

View File

@ -0,0 +1,6 @@
{
"name": "management-agent-tests",
"scripts": {
"test": "mocha -t 40000 *.js"
}
}