Compare commits
2 Commits
developmen
...
ft/ZENKO-1
Author | SHA1 | Date |
---|---|---|
Guillaume Gimenez | 5abca4dc5d | |
Guillaume Gimenez | fb7d3ed652 |
3
index.js
3
index.js
|
@ -105,6 +105,9 @@ module.exports = {
|
||||||
LogConsumer:
|
LogConsumer:
|
||||||
require('./lib/storage/metadata/mongoclient/LogConsumer'),
|
require('./lib/storage/metadata/mongoclient/LogConsumer'),
|
||||||
},
|
},
|
||||||
|
proxy: {
|
||||||
|
Server: require('./lib/storage/metadata/proxy/Server'),
|
||||||
|
},
|
||||||
},
|
},
|
||||||
data: {
|
data: {
|
||||||
file: {
|
file: {
|
||||||
|
|
|
@ -0,0 +1,317 @@
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
|
const errors = require('../../../errors');
|
||||||
|
const { getURIComponents, getRequestBody, sendResponse } = require('./utils');
|
||||||
|
|
||||||
|
function desUnderscoreize(underscoredObj) {
|
||||||
|
const filtered = {};
|
||||||
|
const filter = /^_/;
|
||||||
|
const obj = underscoredObj || {};
|
||||||
|
Object.keys(obj).forEach(key => {
|
||||||
|
filtered[key.replace(filter, '')] = obj[key];
|
||||||
|
});
|
||||||
|
return filtered;
|
||||||
|
}
|
||||||
|
|
||||||
|
class BucketdRoutes {
|
||||||
|
/**
|
||||||
|
* Create a new Bucketd routes instance
|
||||||
|
* This class implements the bucketd Metadata protocol and is used in
|
||||||
|
* the Metadata Proxy Server to implement this protocol on top of
|
||||||
|
* various metadata backends.
|
||||||
|
*
|
||||||
|
* Implementation note: the adaptations performed in the methods of
|
||||||
|
* the class MetadataWrapper are not required in this context.
|
||||||
|
* For this reason, the methods of the `client' instance are directly
|
||||||
|
* called from this class, somewhat defeating the encapsulation of the
|
||||||
|
* wrapper.
|
||||||
|
*
|
||||||
|
* @param {Arsenal.storage.metadata.MetadataWrapper} metadataWrapper - to
|
||||||
|
* be used as a translation target for the bucketd protocol.
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
*/
|
||||||
|
constructor(metadataWrapper, logger) {
|
||||||
|
this._metadataWrapper = metadataWrapper;
|
||||||
|
this._logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Metadata Wrapper's wrapper
|
||||||
|
|
||||||
|
// `attributes' context methods
|
||||||
|
|
||||||
|
_getBucketAttributes(req, res, bucketName, logger) {
|
||||||
|
return this._metadataWrapper.client.getBucketAttributes(
|
||||||
|
bucketName, logger, (err, data) =>
|
||||||
|
sendResponse(req, res, logger, err,
|
||||||
|
desUnderscoreize(data))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
_putBucketAttributes(req, res, bucketName, data, logger) {
|
||||||
|
return this._metadataWrapper.client.putBucketAttributes(
|
||||||
|
bucketName, data, logger, err =>
|
||||||
|
sendResponse(req, res, logger, err));
|
||||||
|
}
|
||||||
|
|
||||||
|
// `bucket' context methods
|
||||||
|
|
||||||
|
_createBucket(req, res, bucketName, data, logger) {
|
||||||
|
return this._metadataWrapper.client.createBucket(
|
||||||
|
bucketName, data, logger, err =>
|
||||||
|
sendResponse(req, res, logger, err));
|
||||||
|
}
|
||||||
|
|
||||||
|
_deleteBucket(req, res, bucketName, logger) {
|
||||||
|
return this._metadataWrapper.client.deleteBucket(
|
||||||
|
bucketName, logger, err =>
|
||||||
|
sendResponse(req, res, logger, err));
|
||||||
|
}
|
||||||
|
|
||||||
|
_putObject(req, res, bucketName, objectName, objectValue, params, logger) {
|
||||||
|
return this._metadataWrapper.client.putObject(
|
||||||
|
bucketName, objectName, JSON.parse(objectValue),
|
||||||
|
params, logger, (err, data) =>
|
||||||
|
sendResponse(req, res, logger, err, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
_getObject(req, res, bucketName, objectName, params, logger) {
|
||||||
|
return this._metadataWrapper.client.getObject(
|
||||||
|
bucketName, objectName, params, logger, (err, data) =>
|
||||||
|
sendResponse(req, res, logger, err, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
_deleteObject(req, res, bucketName, objectName, params, logger) {
|
||||||
|
return this._metadataWrapper.client.deleteObject(
|
||||||
|
bucketName, objectName, params, logger, (err, data) =>
|
||||||
|
sendResponse(req, res, logger, err, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
_listObject(req, res, bucketName, params, logger) {
|
||||||
|
const listingParameters = params || {};
|
||||||
|
if (listingParameters.listingType === undefined) {
|
||||||
|
listingParameters.listingType = 'Delimiter';
|
||||||
|
}
|
||||||
|
if (listingParameters.maxKeys) {
|
||||||
|
listingParameters.maxKeys = Number.parseInt(params.maxKeys, 10);
|
||||||
|
}
|
||||||
|
return this._metadataWrapper.client.listObject(
|
||||||
|
bucketName, listingParameters, logger, (err, data) =>
|
||||||
|
sendResponse(req, res, logger, err, data));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Misc
|
||||||
|
/* //XXX sub-optimal
|
||||||
|
* listing retrieved from the metadataWrapper is not well formatted for
|
||||||
|
* the purpose of this server. The `value' fields are stringified twice and
|
||||||
|
* have to be deserialized here before being sent to the client.
|
||||||
|
*/
|
||||||
|
_destringifyValues(data) {
|
||||||
|
const newContents = data.Contents.map(entry => {
|
||||||
|
const value = JSON.parse(entry.value);
|
||||||
|
return { key: entry.key, value };
|
||||||
|
});
|
||||||
|
const alteredData = data;
|
||||||
|
alteredData.Contents = newContents;
|
||||||
|
return alteredData;
|
||||||
|
}
|
||||||
|
|
||||||
|
_createRequestLogger(req) {
|
||||||
|
const uids = req.headers['x-scal-request-uids'];
|
||||||
|
const logger = uids === undefined ?
|
||||||
|
this._logger.newRequestLogger() :
|
||||||
|
this._logger.newRequestLoggerFromSerializedUids(uids);
|
||||||
|
logger.trace('new request', { method: req.method, url: req.url });
|
||||||
|
return logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Internal routes
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle routes related to operations on bucket attributes
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @param {object} uriComponents -
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
_attributesRoutes(req, res, uriComponents, logger) {
|
||||||
|
if (uriComponents.bucketName === undefined) {
|
||||||
|
logger.error('Missing bucket name for attributes route',
|
||||||
|
{ uriComponents });
|
||||||
|
return sendResponse(req, res, logger, errors.BadRequest);
|
||||||
|
}
|
||||||
|
switch (req.method) {
|
||||||
|
case 'GET':
|
||||||
|
return this._getBucketAttributes(
|
||||||
|
req, res,
|
||||||
|
uriComponents.bucketName, logger, (err, attrs) =>
|
||||||
|
sendResponse(req, res, logger, err, attrs));
|
||||||
|
case 'POST':
|
||||||
|
return getRequestBody(req, (err, body) => {
|
||||||
|
if (err) {
|
||||||
|
return sendResponse(req, res, logger, err);
|
||||||
|
}
|
||||||
|
return this._putBucketAttributes(
|
||||||
|
req, res,
|
||||||
|
uriComponents.bucketName, body, logger, err =>
|
||||||
|
sendResponse(req, res, logger, err));
|
||||||
|
});
|
||||||
|
default:
|
||||||
|
return sendResponse(req, res, logger, errors.RouteNotFound);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle routes related to operations on buckets
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @param {object} uriComponents -
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
_bucketRoutes(req, res, uriComponents, logger) {
|
||||||
|
if (uriComponents.bucketName === undefined) {
|
||||||
|
logger.error('Missing bucket name for bucket route',
|
||||||
|
{ uriComponents });
|
||||||
|
return sendResponse(req, res, logger, errors.BadRequest);
|
||||||
|
}
|
||||||
|
switch (req.method) {
|
||||||
|
case 'GET':
|
||||||
|
return this._listObject(req, res,
|
||||||
|
uriComponents.bucketName,
|
||||||
|
uriComponents.options,
|
||||||
|
logger);
|
||||||
|
case 'DELETE':
|
||||||
|
return this._deleteBucket(req, res,
|
||||||
|
uriComponents.bucketName, logger);
|
||||||
|
case 'POST':
|
||||||
|
return getRequestBody(req, (err, body) => {
|
||||||
|
if (err) {
|
||||||
|
return sendResponse(req, res, logger, err);
|
||||||
|
}
|
||||||
|
return this._createBucket(req, res,
|
||||||
|
uriComponents.bucketName,
|
||||||
|
body, logger);
|
||||||
|
});
|
||||||
|
default:
|
||||||
|
return sendResponse(req, res, logger, errors.RouteNotFound);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle routes related to operations on objects
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @param {object} uriComponents -
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
_objectRoutes(req, res, uriComponents, logger) {
|
||||||
|
if (uriComponents.bucketName === undefined) {
|
||||||
|
logger.error('Missing bucket name for bucket route',
|
||||||
|
{ uriComponents });
|
||||||
|
return sendResponse(req, res, logger, errors.BadRequest);
|
||||||
|
}
|
||||||
|
switch (req.method) {
|
||||||
|
case 'GET':
|
||||||
|
return this._getObject(req, res,
|
||||||
|
uriComponents.bucketName,
|
||||||
|
uriComponents.objectName,
|
||||||
|
uriComponents.options,
|
||||||
|
logger);
|
||||||
|
case 'DELETE':
|
||||||
|
return this._deleteObject(req, res,
|
||||||
|
uriComponents.bucketName,
|
||||||
|
uriComponents.objectName,
|
||||||
|
uriComponents.options,
|
||||||
|
logger);
|
||||||
|
case 'POST':
|
||||||
|
return getRequestBody(req, (err, body) =>
|
||||||
|
this._putObject(req, res,
|
||||||
|
uriComponents.bucketName,
|
||||||
|
uriComponents.objectName,
|
||||||
|
body,
|
||||||
|
uriComponents.options,
|
||||||
|
logger));
|
||||||
|
default:
|
||||||
|
return sendResponse(req, res, logger, errors.RouteNotFound);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle default routes. e.g. URI starting with /default/
|
||||||
|
* (or anything excepted an underscore)
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @param {object} uriComponents -
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
_defaultRoutes(req, res, uriComponents, logger) {
|
||||||
|
switch (uriComponents.context) {
|
||||||
|
case 'leader':
|
||||||
|
case 'informations':
|
||||||
|
case 'parallel':
|
||||||
|
logger.trace(`${uriComponents.context} operation`);
|
||||||
|
return sendResponse(req, res, logger, errors.NotImplemented);
|
||||||
|
case 'metadataInformation':
|
||||||
|
return sendResponse(req, res, logger, undefined,
|
||||||
|
'{"metadataVersion":2}');
|
||||||
|
case 'bucket':
|
||||||
|
logger.trace(`${uriComponents.context} operation`);
|
||||||
|
if (uriComponents.objectName) {
|
||||||
|
return this._objectRoutes(req, res, uriComponents, logger);
|
||||||
|
}
|
||||||
|
return this._bucketRoutes(req, res, uriComponents, logger);
|
||||||
|
case 'attributes':
|
||||||
|
logger.trace(`${uriComponents.context} operation`);
|
||||||
|
return this._attributesRoutes(req, res, uriComponents, logger);
|
||||||
|
default:
|
||||||
|
logger.error('invalid URI', { uriComponents });
|
||||||
|
return sendResponse(req, res, logger, errors.RouteNotFound);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handle admin routes. e.g. URI starting with /_/
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @param {object} uriComponents -
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
_adminRoutes(req, res, uriComponents, logger) {
|
||||||
|
return sendResponse(req, res, logger, errors.NotImplemented);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The route dispatching method
|
||||||
|
|
||||||
|
/**
|
||||||
|
* dispatch the HTTP request to the appropriate handling function.
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
dispatch(req, res) {
|
||||||
|
const adminNamespace = '_';
|
||||||
|
const logger = this._createRequestLogger(req);
|
||||||
|
const uriComponents = getURIComponents(req.url, logger);
|
||||||
|
if (!uriComponents) {
|
||||||
|
return sendResponse(req, res, logger, errors.BadRequest);
|
||||||
|
}
|
||||||
|
switch (uriComponents.namespace) {
|
||||||
|
case adminNamespace:
|
||||||
|
return this._adminRoutes(req, res, uriComponents, logger);
|
||||||
|
default: // coincidently matches the `default' literal namespace as well
|
||||||
|
return this._defaultRoutes(req, res, uriComponents, logger);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = BucketdRoutes;
|
|
@ -0,0 +1,33 @@
|
||||||
|
# Metatada Proxy Server
|
||||||
|
|
||||||
|
## Design goals
|
||||||
|
|
||||||
|
## Design choices
|
||||||
|
|
||||||
|
## Implementation details
|
||||||
|
|
||||||
|
## How to run the proxy server
|
||||||
|
|
||||||
|
```js
|
||||||
|
const werelogs = require('werelogs');
|
||||||
|
const MetadataWrapper = require('arsenal')
|
||||||
|
.storage.metadata.MetadataWrapper;
|
||||||
|
const Server = require('arsenal')
|
||||||
|
.storage.metadata.proxy.Server;
|
||||||
|
|
||||||
|
const logger = new werelogs.Logger('MetadataProxyServer',
|
||||||
|
'debug', 'debug');
|
||||||
|
const metadataWrapper = new MetadataWrapper('mem', {},
|
||||||
|
null, logger);
|
||||||
|
const server = new Server(metadataWrapper,
|
||||||
|
{
|
||||||
|
port: 9001,
|
||||||
|
workers: 1,
|
||||||
|
},
|
||||||
|
logger);
|
||||||
|
server.start(() => {
|
||||||
|
logger.info('Metadata Proxy Server successfully started. ' +
|
||||||
|
`Using the ${metadataWrapper.implName} backend`);
|
||||||
|
});
|
||||||
|
|
||||||
|
```
|
|
@ -0,0 +1,105 @@
|
||||||
|
'use strict'; // eslint-disable-line strict
|
||||||
|
|
||||||
|
const cluster = require('cluster');
|
||||||
|
|
||||||
|
const HttpServer = require('../../../network/http/server');
|
||||||
|
const BucketdRoutes = require('./BucketdRoutes');
|
||||||
|
|
||||||
|
const requiresOneWorker = {
|
||||||
|
// in memory kvs storage is not shared across processes
|
||||||
|
memorybucket: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
class Server {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new Metadata Proxy Server instance
|
||||||
|
*
|
||||||
|
* The Metadata Proxy Server is an HTTP server that translates
|
||||||
|
* requests of the bucketd sub-protocol into function calls to
|
||||||
|
* a properly configured MetadataWrapper instance. Such instance
|
||||||
|
* can use any of the available metadata backends available.
|
||||||
|
*
|
||||||
|
* @param {arsenal.storage.metadata.MetadataWrapper} metadataWrapper -
|
||||||
|
* @param {Object} configuration -
|
||||||
|
* @param {number} configuration.port -
|
||||||
|
* @param {number} configuration.workers -
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
*/
|
||||||
|
constructor(metadataWrapper, configuration, logger) {
|
||||||
|
this._configuration = configuration;
|
||||||
|
if (requiresOneWorker[metadataWrapper.implName] &&
|
||||||
|
this._configuration.workers !== 1) {
|
||||||
|
logger.warn('This metadata backend requires only one worker',
|
||||||
|
{ metadataBackend: metadataWrapper.implName });
|
||||||
|
this._configuration.workers = 1;
|
||||||
|
}
|
||||||
|
this._logger = logger;
|
||||||
|
this._metadataWrapper = metadataWrapper;
|
||||||
|
|
||||||
|
this._proxyRoutes = new BucketdRoutes(metadataWrapper, this._logger);
|
||||||
|
this._httpServer = null;
|
||||||
|
this._installSignalHandlers();
|
||||||
|
}
|
||||||
|
|
||||||
|
_cleanup() {
|
||||||
|
if (cluster.isWorker) {
|
||||||
|
this._logger.info('Server worker shutting down...');
|
||||||
|
this._httpServer.stop();
|
||||||
|
} else {
|
||||||
|
this._logger.info('Server shutting down...');
|
||||||
|
}
|
||||||
|
return process.exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
_installSignalHandlers() {
|
||||||
|
process.on('SIGINT', () => { this._cleanup(); });
|
||||||
|
process.on('SIGHUP', () => { this._cleanup(); });
|
||||||
|
process.on('SIGQUIT', () => { this._cleanup(); });
|
||||||
|
process.on('SIGTERM', () => { this._cleanup(); });
|
||||||
|
process.on('SIGPIPE', () => {});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the Metadata Proxy Server instance
|
||||||
|
*
|
||||||
|
* @param {Function} cb - called with no argument when the onListening event
|
||||||
|
* is triggered
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
start(cb) {
|
||||||
|
if (cluster.isMaster) {
|
||||||
|
for (let i = 0; i < this._configuration.workers; ++i) {
|
||||||
|
cluster.fork();
|
||||||
|
}
|
||||||
|
cluster.on('disconnect', worker => {
|
||||||
|
this._logger
|
||||||
|
.info(`worker ${worker.process.pid} exited, respawning.`);
|
||||||
|
cluster.fork();
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
this._httpServer = new HttpServer(this._configuration.port,
|
||||||
|
this._logger);
|
||||||
|
if (this._configuration.bindAddress) {
|
||||||
|
this._httpServer.setBindAddress(
|
||||||
|
this._configuration.bindAddress);
|
||||||
|
}
|
||||||
|
this._httpServer
|
||||||
|
.onRequest((req, res) => this._proxyRoutes.dispatch(req, res))
|
||||||
|
.onListening(() => {
|
||||||
|
this._logger.info(
|
||||||
|
'Metadata Proxy Server now listening on' +
|
||||||
|
` port ${this._configuration.port}`);
|
||||||
|
if (cb) {
|
||||||
|
return this._metadataWrapper.setup(cb);
|
||||||
|
}
|
||||||
|
return this._metadataWrapper.setup(() => {
|
||||||
|
this._logger.info('MetadataWrapper setup complete.');
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = Server;
|
|
@ -0,0 +1,171 @@
|
||||||
|
const url = require('url');
|
||||||
|
const querystring = require('querystring');
|
||||||
|
const errors = require('../../../errors');
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts components from URI.
|
||||||
|
* @param {string} uri - uri part of the received request
|
||||||
|
* @param {werelogs.Logger} logger -
|
||||||
|
* @return {object} ret - contains up to 4 string properties,
|
||||||
|
* @return {string} ret.namespace
|
||||||
|
* @return {string} ret.context
|
||||||
|
* @return {string} ret.bucketName
|
||||||
|
* @return {string} ret.objectName
|
||||||
|
*/
|
||||||
|
function getURIComponents(uri, logger) {
|
||||||
|
try {
|
||||||
|
if (uri.charAt(0) !== '/') {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
|
const { pathname, query } = url.parse(uri);
|
||||||
|
const options = query ? querystring.parse(query) : {};
|
||||||
|
const typeIndex = pathname.indexOf('/', 1);
|
||||||
|
const bucketIndex = pathname.indexOf('/', typeIndex + 1);
|
||||||
|
const objectIndex = pathname.indexOf('/', bucketIndex + 1);
|
||||||
|
|
||||||
|
if (typeIndex === -1 || typeIndex === pathname.length - 1) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
if (bucketIndex === -1) {
|
||||||
|
return {
|
||||||
|
namespace: pathname.substring(1, typeIndex),
|
||||||
|
context: pathname.substring(typeIndex + 1),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (bucketIndex === pathname.length - 1) {
|
||||||
|
return {
|
||||||
|
namespace: pathname.substring(1, typeIndex),
|
||||||
|
context: pathname.substring(typeIndex + 1, bucketIndex),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (objectIndex === -1) {
|
||||||
|
return {
|
||||||
|
namespace: pathname.substring(1, typeIndex),
|
||||||
|
context: pathname.substring(typeIndex + 1, bucketIndex),
|
||||||
|
bucketName: pathname.substring(bucketIndex + 1),
|
||||||
|
options,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (objectIndex === pathname.length - 1) {
|
||||||
|
return {
|
||||||
|
namespace: pathname.substring(1, typeIndex),
|
||||||
|
context: pathname.substring(typeIndex + 1, bucketIndex),
|
||||||
|
bucketName: pathname.substring(bucketIndex + 1, objectIndex),
|
||||||
|
options,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
namespace: pathname.substring(1, typeIndex),
|
||||||
|
context: pathname.substring(typeIndex + 1, bucketIndex),
|
||||||
|
bucketName: pathname.substring(bucketIndex + 1, objectIndex),
|
||||||
|
objectName: decodeURIComponent(pathname.substring(objectIndex + 1)),
|
||||||
|
options,
|
||||||
|
};
|
||||||
|
} catch (ex) {
|
||||||
|
logger.error('Invalid URI: failed to parse',
|
||||||
|
{ uri, error: ex, errorStack: ex.stack });
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts the body of the request through a callback
|
||||||
|
* @param {http.IncomingMessage} request - request received from bucketclient
|
||||||
|
* @param {Function} cb - function which has an interest in the request body.
|
||||||
|
* The first parameter is err and may be falsey
|
||||||
|
* The second parameter is the body of the request
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
function getRequestBody(request, cb) {
|
||||||
|
const body = [];
|
||||||
|
let bodyLen = 0;
|
||||||
|
request.on('data', data => {
|
||||||
|
body.push(data);
|
||||||
|
bodyLen += data.length;
|
||||||
|
}).on('error', cb).on('end', () => {
|
||||||
|
cb(null, Buffer.concat(body, bodyLen).toString());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emit a log entry corresponding to the end of the request
|
||||||
|
*
|
||||||
|
* @param {werelogs.Logger} logger - instance of the logger that will emit the
|
||||||
|
* log entry
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {object} statusCode - HTTP status code sent back to the client
|
||||||
|
* @param {object} statusMessage - HTTP status message sent back to the client
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
function _logRequestEnd(logger, req, statusCode, statusMessage) {
|
||||||
|
const info = {
|
||||||
|
clientIp: req.socket.remoteAddress,
|
||||||
|
clientPort: req.socket.remotePort,
|
||||||
|
httpMethod: req.method,
|
||||||
|
httpURL: req.url,
|
||||||
|
httpCode: statusCode,
|
||||||
|
httpMessage: statusMessage,
|
||||||
|
};
|
||||||
|
logger.end('finished handling request', info);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request processing exit point, sends back to the client the specified data
|
||||||
|
* and/or error code
|
||||||
|
*
|
||||||
|
* @param {http.IncomingMessage} req - request being processed
|
||||||
|
* @param {http.OutgoingMessage} res - response associated to the request
|
||||||
|
* @param {werelogs.Logger} log - instance of the logger to use
|
||||||
|
* @param {Arsenal.Error} err - if not null, defines the HTTP status
|
||||||
|
* code and message
|
||||||
|
* @param {string} data - if not null, used as the response body. If `data'
|
||||||
|
* isn't a string, it's considered as a JSON object and
|
||||||
|
* it's content get serialized before being sent.
|
||||||
|
* @return {undefined}
|
||||||
|
*/
|
||||||
|
function sendResponse(req, res, log, err, data) {
|
||||||
|
let statusCode;
|
||||||
|
let statusMessage;
|
||||||
|
if (err) {
|
||||||
|
statusCode = err.code;
|
||||||
|
statusMessage = err.message;
|
||||||
|
} else {
|
||||||
|
statusCode = errors.ok.code;
|
||||||
|
statusMessage = errors.ok.message;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (data) {
|
||||||
|
let resData = data;
|
||||||
|
if (typeof resData === 'object') {
|
||||||
|
resData = JSON.stringify(data);
|
||||||
|
} else if (typeof resData === 'number') {
|
||||||
|
resData = resData.toString();
|
||||||
|
}
|
||||||
|
/*
|
||||||
|
* Encoding data to binary provides a hot path to write data
|
||||||
|
* directly to the socket, without node.js trying to encode the data
|
||||||
|
* over and over again.
|
||||||
|
*/
|
||||||
|
const rawData = Buffer.from(resData, 'utf8');
|
||||||
|
/*
|
||||||
|
* Using Buffer.bytelength is not required here because data is binary
|
||||||
|
* encoded, data.length would give us the exact byte length
|
||||||
|
*/
|
||||||
|
res.writeHead(statusCode, statusMessage, {
|
||||||
|
'content-length': rawData.length,
|
||||||
|
});
|
||||||
|
res.write(rawData);
|
||||||
|
} else {
|
||||||
|
res.writeHead(statusCode, statusMessage, { 'content-length': 0 });
|
||||||
|
}
|
||||||
|
return res.end(() => {
|
||||||
|
_logRequestEnd(log, req, statusCode, statusMessage);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
getURIComponents,
|
||||||
|
getRequestBody,
|
||||||
|
sendResponse,
|
||||||
|
};
|
Loading…
Reference in New Issue