Compare commits

...

2 Commits

Author SHA1 Message Date
Guillaume Gimenez 5abca4dc5d FT: Metadata Proxy Server: MongoDB related fixes 2018-06-06 15:36:27 -07:00
Guillaume Gimenez fb7d3ed652 FT: Metadata Proxy Server 2018-06-06 15:36:27 -07:00
5 changed files with 629 additions and 0 deletions

View File

@ -105,6 +105,9 @@ module.exports = {
LogConsumer: LogConsumer:
require('./lib/storage/metadata/mongoclient/LogConsumer'), require('./lib/storage/metadata/mongoclient/LogConsumer'),
}, },
proxy: {
Server: require('./lib/storage/metadata/proxy/Server'),
},
}, },
data: { data: {
file: { file: {

View File

@ -0,0 +1,317 @@
'use strict'; // eslint-disable-line strict
const errors = require('../../../errors');
const { getURIComponents, getRequestBody, sendResponse } = require('./utils');
function desUnderscoreize(underscoredObj) {
const filtered = {};
const filter = /^_/;
const obj = underscoredObj || {};
Object.keys(obj).forEach(key => {
filtered[key.replace(filter, '')] = obj[key];
});
return filtered;
}
class BucketdRoutes {
/**
* Create a new Bucketd routes instance
* This class implements the bucketd Metadata protocol and is used in
* the Metadata Proxy Server to implement this protocol on top of
* various metadata backends.
*
* Implementation note: the adaptations performed in the methods of
* the class MetadataWrapper are not required in this context.
* For this reason, the methods of the `client' instance are directly
* called from this class, somewhat defeating the encapsulation of the
* wrapper.
*
* @param {Arsenal.storage.metadata.MetadataWrapper} metadataWrapper - to
* be used as a translation target for the bucketd protocol.
* @param {werelogs.Logger} logger -
*/
constructor(metadataWrapper, logger) {
this._metadataWrapper = metadataWrapper;
this._logger = logger;
}
// Metadata Wrapper's wrapper
// `attributes' context methods
_getBucketAttributes(req, res, bucketName, logger) {
return this._metadataWrapper.client.getBucketAttributes(
bucketName, logger, (err, data) =>
sendResponse(req, res, logger, err,
desUnderscoreize(data))
);
}
_putBucketAttributes(req, res, bucketName, data, logger) {
return this._metadataWrapper.client.putBucketAttributes(
bucketName, data, logger, err =>
sendResponse(req, res, logger, err));
}
// `bucket' context methods
_createBucket(req, res, bucketName, data, logger) {
return this._metadataWrapper.client.createBucket(
bucketName, data, logger, err =>
sendResponse(req, res, logger, err));
}
_deleteBucket(req, res, bucketName, logger) {
return this._metadataWrapper.client.deleteBucket(
bucketName, logger, err =>
sendResponse(req, res, logger, err));
}
_putObject(req, res, bucketName, objectName, objectValue, params, logger) {
return this._metadataWrapper.client.putObject(
bucketName, objectName, JSON.parse(objectValue),
params, logger, (err, data) =>
sendResponse(req, res, logger, err, data));
}
_getObject(req, res, bucketName, objectName, params, logger) {
return this._metadataWrapper.client.getObject(
bucketName, objectName, params, logger, (err, data) =>
sendResponse(req, res, logger, err, data));
}
_deleteObject(req, res, bucketName, objectName, params, logger) {
return this._metadataWrapper.client.deleteObject(
bucketName, objectName, params, logger, (err, data) =>
sendResponse(req, res, logger, err, data));
}
_listObject(req, res, bucketName, params, logger) {
const listingParameters = params || {};
if (listingParameters.listingType === undefined) {
listingParameters.listingType = 'Delimiter';
}
if (listingParameters.maxKeys) {
listingParameters.maxKeys = Number.parseInt(params.maxKeys, 10);
}
return this._metadataWrapper.client.listObject(
bucketName, listingParameters, logger, (err, data) =>
sendResponse(req, res, logger, err, data));
}
// Misc
/* //XXX sub-optimal
* listing retrieved from the metadataWrapper is not well formatted for
* the purpose of this server. The `value' fields are stringified twice and
* have to be deserialized here before being sent to the client.
*/
_destringifyValues(data) {
const newContents = data.Contents.map(entry => {
const value = JSON.parse(entry.value);
return { key: entry.key, value };
});
const alteredData = data;
alteredData.Contents = newContents;
return alteredData;
}
_createRequestLogger(req) {
const uids = req.headers['x-scal-request-uids'];
const logger = uids === undefined ?
this._logger.newRequestLogger() :
this._logger.newRequestLoggerFromSerializedUids(uids);
logger.trace('new request', { method: req.method, url: req.url });
return logger;
}
// Internal routes
/**
* Handle routes related to operations on bucket attributes
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @param {object} uriComponents -
* @param {werelogs.Logger} logger -
* @return {undefined}
*/
_attributesRoutes(req, res, uriComponents, logger) {
if (uriComponents.bucketName === undefined) {
logger.error('Missing bucket name for attributes route',
{ uriComponents });
return sendResponse(req, res, logger, errors.BadRequest);
}
switch (req.method) {
case 'GET':
return this._getBucketAttributes(
req, res,
uriComponents.bucketName, logger, (err, attrs) =>
sendResponse(req, res, logger, err, attrs));
case 'POST':
return getRequestBody(req, (err, body) => {
if (err) {
return sendResponse(req, res, logger, err);
}
return this._putBucketAttributes(
req, res,
uriComponents.bucketName, body, logger, err =>
sendResponse(req, res, logger, err));
});
default:
return sendResponse(req, res, logger, errors.RouteNotFound);
}
}
/**
* Handle routes related to operations on buckets
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @param {object} uriComponents -
* @param {werelogs.Logger} logger -
* @return {undefined}
*/
_bucketRoutes(req, res, uriComponents, logger) {
if (uriComponents.bucketName === undefined) {
logger.error('Missing bucket name for bucket route',
{ uriComponents });
return sendResponse(req, res, logger, errors.BadRequest);
}
switch (req.method) {
case 'GET':
return this._listObject(req, res,
uriComponents.bucketName,
uriComponents.options,
logger);
case 'DELETE':
return this._deleteBucket(req, res,
uriComponents.bucketName, logger);
case 'POST':
return getRequestBody(req, (err, body) => {
if (err) {
return sendResponse(req, res, logger, err);
}
return this._createBucket(req, res,
uriComponents.bucketName,
body, logger);
});
default:
return sendResponse(req, res, logger, errors.RouteNotFound);
}
}
/**
* Handle routes related to operations on objects
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @param {object} uriComponents -
* @param {werelogs.Logger} logger -
* @return {undefined}
*/
_objectRoutes(req, res, uriComponents, logger) {
if (uriComponents.bucketName === undefined) {
logger.error('Missing bucket name for bucket route',
{ uriComponents });
return sendResponse(req, res, logger, errors.BadRequest);
}
switch (req.method) {
case 'GET':
return this._getObject(req, res,
uriComponents.bucketName,
uriComponents.objectName,
uriComponents.options,
logger);
case 'DELETE':
return this._deleteObject(req, res,
uriComponents.bucketName,
uriComponents.objectName,
uriComponents.options,
logger);
case 'POST':
return getRequestBody(req, (err, body) =>
this._putObject(req, res,
uriComponents.bucketName,
uriComponents.objectName,
body,
uriComponents.options,
logger));
default:
return sendResponse(req, res, logger, errors.RouteNotFound);
}
}
/**
* Handle default routes. e.g. URI starting with /default/
* (or anything excepted an underscore)
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @param {object} uriComponents -
* @param {werelogs.Logger} logger -
* @return {undefined}
*/
_defaultRoutes(req, res, uriComponents, logger) {
switch (uriComponents.context) {
case 'leader':
case 'informations':
case 'parallel':
logger.trace(`${uriComponents.context} operation`);
return sendResponse(req, res, logger, errors.NotImplemented);
case 'metadataInformation':
return sendResponse(req, res, logger, undefined,
'{"metadataVersion":2}');
case 'bucket':
logger.trace(`${uriComponents.context} operation`);
if (uriComponents.objectName) {
return this._objectRoutes(req, res, uriComponents, logger);
}
return this._bucketRoutes(req, res, uriComponents, logger);
case 'attributes':
logger.trace(`${uriComponents.context} operation`);
return this._attributesRoutes(req, res, uriComponents, logger);
default:
logger.error('invalid URI', { uriComponents });
return sendResponse(req, res, logger, errors.RouteNotFound);
}
}
/**
* Handle admin routes. e.g. URI starting with /_/
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @param {object} uriComponents -
* @param {werelogs.Logger} logger -
* @return {undefined}
*/
_adminRoutes(req, res, uriComponents, logger) {
return sendResponse(req, res, logger, errors.NotImplemented);
}
// The route dispatching method
/**
* dispatch the HTTP request to the appropriate handling function.
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @return {undefined}
*/
dispatch(req, res) {
const adminNamespace = '_';
const logger = this._createRequestLogger(req);
const uriComponents = getURIComponents(req.url, logger);
if (!uriComponents) {
return sendResponse(req, res, logger, errors.BadRequest);
}
switch (uriComponents.namespace) {
case adminNamespace:
return this._adminRoutes(req, res, uriComponents, logger);
default: // coincidently matches the `default' literal namespace as well
return this._defaultRoutes(req, res, uriComponents, logger);
}
}
}
module.exports = BucketdRoutes;

View File

@ -0,0 +1,33 @@
# Metatada Proxy Server
## Design goals
## Design choices
## Implementation details
## How to run the proxy server
```js
const werelogs = require('werelogs');
const MetadataWrapper = require('arsenal')
.storage.metadata.MetadataWrapper;
const Server = require('arsenal')
.storage.metadata.proxy.Server;
const logger = new werelogs.Logger('MetadataProxyServer',
'debug', 'debug');
const metadataWrapper = new MetadataWrapper('mem', {},
null, logger);
const server = new Server(metadataWrapper,
{
port: 9001,
workers: 1,
},
logger);
server.start(() => {
logger.info('Metadata Proxy Server successfully started. ' +
`Using the ${metadataWrapper.implName} backend`);
});
```

View File

@ -0,0 +1,105 @@
'use strict'; // eslint-disable-line strict
const cluster = require('cluster');
const HttpServer = require('../../../network/http/server');
const BucketdRoutes = require('./BucketdRoutes');
const requiresOneWorker = {
// in memory kvs storage is not shared across processes
memorybucket: true,
};
class Server {
/**
* Create a new Metadata Proxy Server instance
*
* The Metadata Proxy Server is an HTTP server that translates
* requests of the bucketd sub-protocol into function calls to
* a properly configured MetadataWrapper instance. Such instance
* can use any of the available metadata backends available.
*
* @param {arsenal.storage.metadata.MetadataWrapper} metadataWrapper -
* @param {Object} configuration -
* @param {number} configuration.port -
* @param {number} configuration.workers -
* @param {werelogs.Logger} logger -
*/
constructor(metadataWrapper, configuration, logger) {
this._configuration = configuration;
if (requiresOneWorker[metadataWrapper.implName] &&
this._configuration.workers !== 1) {
logger.warn('This metadata backend requires only one worker',
{ metadataBackend: metadataWrapper.implName });
this._configuration.workers = 1;
}
this._logger = logger;
this._metadataWrapper = metadataWrapper;
this._proxyRoutes = new BucketdRoutes(metadataWrapper, this._logger);
this._httpServer = null;
this._installSignalHandlers();
}
_cleanup() {
if (cluster.isWorker) {
this._logger.info('Server worker shutting down...');
this._httpServer.stop();
} else {
this._logger.info('Server shutting down...');
}
return process.exit(0);
}
_installSignalHandlers() {
process.on('SIGINT', () => { this._cleanup(); });
process.on('SIGHUP', () => { this._cleanup(); });
process.on('SIGQUIT', () => { this._cleanup(); });
process.on('SIGTERM', () => { this._cleanup(); });
process.on('SIGPIPE', () => {});
}
/**
* Start the Metadata Proxy Server instance
*
* @param {Function} cb - called with no argument when the onListening event
* is triggered
* @return {undefined}
*/
start(cb) {
if (cluster.isMaster) {
for (let i = 0; i < this._configuration.workers; ++i) {
cluster.fork();
}
cluster.on('disconnect', worker => {
this._logger
.info(`worker ${worker.process.pid} exited, respawning.`);
cluster.fork();
});
} else {
this._httpServer = new HttpServer(this._configuration.port,
this._logger);
if (this._configuration.bindAddress) {
this._httpServer.setBindAddress(
this._configuration.bindAddress);
}
this._httpServer
.onRequest((req, res) => this._proxyRoutes.dispatch(req, res))
.onListening(() => {
this._logger.info(
'Metadata Proxy Server now listening on' +
` port ${this._configuration.port}`);
if (cb) {
return this._metadataWrapper.setup(cb);
}
return this._metadataWrapper.setup(() => {
this._logger.info('MetadataWrapper setup complete.');
});
})
.start();
}
}
}
module.exports = Server;

View File

@ -0,0 +1,171 @@
const url = require('url');
const querystring = require('querystring');
const errors = require('../../../errors');
/**
* Extracts components from URI.
* @param {string} uri - uri part of the received request
* @param {werelogs.Logger} logger -
* @return {object} ret - contains up to 4 string properties,
* @return {string} ret.namespace
* @return {string} ret.context
* @return {string} ret.bucketName
* @return {string} ret.objectName
*/
function getURIComponents(uri, logger) {
try {
if (uri.charAt(0) !== '/') {
return {};
}
const { pathname, query } = url.parse(uri);
const options = query ? querystring.parse(query) : {};
const typeIndex = pathname.indexOf('/', 1);
const bucketIndex = pathname.indexOf('/', typeIndex + 1);
const objectIndex = pathname.indexOf('/', bucketIndex + 1);
if (typeIndex === -1 || typeIndex === pathname.length - 1) {
return {};
}
if (bucketIndex === -1) {
return {
namespace: pathname.substring(1, typeIndex),
context: pathname.substring(typeIndex + 1),
};
}
if (bucketIndex === pathname.length - 1) {
return {
namespace: pathname.substring(1, typeIndex),
context: pathname.substring(typeIndex + 1, bucketIndex),
};
}
if (objectIndex === -1) {
return {
namespace: pathname.substring(1, typeIndex),
context: pathname.substring(typeIndex + 1, bucketIndex),
bucketName: pathname.substring(bucketIndex + 1),
options,
};
}
if (objectIndex === pathname.length - 1) {
return {
namespace: pathname.substring(1, typeIndex),
context: pathname.substring(typeIndex + 1, bucketIndex),
bucketName: pathname.substring(bucketIndex + 1, objectIndex),
options,
};
}
return {
namespace: pathname.substring(1, typeIndex),
context: pathname.substring(typeIndex + 1, bucketIndex),
bucketName: pathname.substring(bucketIndex + 1, objectIndex),
objectName: decodeURIComponent(pathname.substring(objectIndex + 1)),
options,
};
} catch (ex) {
logger.error('Invalid URI: failed to parse',
{ uri, error: ex, errorStack: ex.stack });
return null;
}
}
/**
* Extracts the body of the request through a callback
* @param {http.IncomingMessage} request - request received from bucketclient
* @param {Function} cb - function which has an interest in the request body.
* The first parameter is err and may be falsey
* The second parameter is the body of the request
* @return {undefined}
*/
function getRequestBody(request, cb) {
const body = [];
let bodyLen = 0;
request.on('data', data => {
body.push(data);
bodyLen += data.length;
}).on('error', cb).on('end', () => {
cb(null, Buffer.concat(body, bodyLen).toString());
});
}
/**
* Emit a log entry corresponding to the end of the request
*
* @param {werelogs.Logger} logger - instance of the logger that will emit the
* log entry
* @param {http.IncomingMessage} req - request being processed
* @param {object} statusCode - HTTP status code sent back to the client
* @param {object} statusMessage - HTTP status message sent back to the client
* @return {undefined}
*/
function _logRequestEnd(logger, req, statusCode, statusMessage) {
const info = {
clientIp: req.socket.remoteAddress,
clientPort: req.socket.remotePort,
httpMethod: req.method,
httpURL: req.url,
httpCode: statusCode,
httpMessage: statusMessage,
};
logger.end('finished handling request', info);
}
/**
* Request processing exit point, sends back to the client the specified data
* and/or error code
*
* @param {http.IncomingMessage} req - request being processed
* @param {http.OutgoingMessage} res - response associated to the request
* @param {werelogs.Logger} log - instance of the logger to use
* @param {Arsenal.Error} err - if not null, defines the HTTP status
* code and message
* @param {string} data - if not null, used as the response body. If `data'
* isn't a string, it's considered as a JSON object and
* it's content get serialized before being sent.
* @return {undefined}
*/
function sendResponse(req, res, log, err, data) {
let statusCode;
let statusMessage;
if (err) {
statusCode = err.code;
statusMessage = err.message;
} else {
statusCode = errors.ok.code;
statusMessage = errors.ok.message;
}
if (data) {
let resData = data;
if (typeof resData === 'object') {
resData = JSON.stringify(data);
} else if (typeof resData === 'number') {
resData = resData.toString();
}
/*
* Encoding data to binary provides a hot path to write data
* directly to the socket, without node.js trying to encode the data
* over and over again.
*/
const rawData = Buffer.from(resData, 'utf8');
/*
* Using Buffer.bytelength is not required here because data is binary
* encoded, data.length would give us the exact byte length
*/
res.writeHead(statusCode, statusMessage, {
'content-length': rawData.length,
});
res.write(rawData);
} else {
res.writeHead(statusCode, statusMessage, { 'content-length': 0 });
}
return res.end(() => {
_logRequestEnd(log, req, statusCode, statusMessage);
});
}
module.exports = {
getURIComponents,
getRequestBody,
sendResponse,
};