Compare commits

...

1 Commits

Author SHA1 Message Date
Rahul Padigela 4214282faf bf: close/end readable/response streams on errors
This fixes the leakage of sockets in CLOSE_WAIT state by closing the streams
and destroying the sockets when the client has abruptly closed the connection.

Upstream requests to Azure/AWS need to be aborted in
AzureClient/AWSClient implementation. Currently azure-storage module doesn't
have a clear way of aborting a request.
2018-01-02 15:39:48 -08:00
1 changed files with 69 additions and 40 deletions

View File

@ -1,7 +1,12 @@
const url = require('url'); const url = require('url');
const { eachSeries } = require('async');
const ipCheck = require('../ipCheck'); const ipCheck = require('../ipCheck');
const errors = require('../errors'); const errors = require('../errors');
const responseErr = new Error();
responseErr.code = 'ResponseError';
responseErr.message = 'response closed by client request before all data sent';
/** /**
* setCommonResponseHeaders - Set HTTP response headers * setCommonResponseHeaders - Set HTTP response headers
* @param {object} headers - key and value of new headers to add * @param {object} headers - key and value of new headers to add
@ -242,54 +247,78 @@ function okContentHeadersResponse(overrideParams, resHeaders,
return response; return response;
} }
function retrieveData(locations, dataRetrievalFn, function retrieveData(locations, retrieveDataFn, response, log) {
response, logger, errorHandlerFn) { let responseDestroyed = false;
if (locations.length === 0) { const _destroyResponse = () => {
return response.end(); // destroys the socket if available
response.destroy();
responseDestroyed = true;
};
response.once('close', () => {
log.debug('received close event before response end');
_destroyResponse();
});
eachSeries(locations,
(current, next) => retrieveDataFn(current, log, (err, readable) => {
let cbCalled = false;
const _next = err => {
// Avoid multiple callbacks since it's possible that response's
// close event and the readable's end event are emitted at
// the same time.
if (!cbCalled) {
cbCalled = true;
next(err);
} }
if (errorHandlerFn === undefined) { };
// eslint-disable-next-line
errorHandlerFn = () => { response.connection.destroy(); }; if (err) {
log.error('failed to get object', {
error: err,
method: 'retrieveData',
});
_destroyResponse();
return _next(err);
}
if (responseDestroyed) {
log.debug('response destroyed before readable could stream');
readable.emit('close');
return _next(responseErr);
} }
const current = locations.shift();
if (current.azureStreamingOptions) { if (current.azureStreamingOptions) {
// pipe data directly from source to response // azure-sdk handles the streams
response.on('error', err => { return _next();
logger.error('error piping data from source');
errorHandlerFn(err);
});
return dataRetrievalFn(current, response, logger, err => {
if (err) {
logger.error('failed to get object from source', {
error: err,
method: 'retrieveData',
backend: 'Azure',
});
return errorHandlerFn(err);
} }
return undefined; // client closed the connection abruptly
}); response.once('close', () => {
log.debug('received close event before readable end');
if (!responseDestroyed) {
_destroyResponse();
} }
return dataRetrievalFn(current, response, logger, readable.emit('close');
(err, readable) => { return _next(responseErr);
if (err) {
logger.error('failed to get object', {
error: err,
method: 'retrieveData',
});
return errorHandlerFn(err);
}
readable.on('error', err => {
logger.error('error piping data from source');
errorHandlerFn(err);
}); });
// readable stream successfully consumed
readable.on('end', () => { readable.on('end', () => {
process.nextTick(retrieveData, log.debug('readable stream end reached');
locations, dataRetrievalFn, response, logger); return _next();
}); });
readable.pipe(response, { end: false }); // errors on server side with readable stream
return undefined; readable.on('error', err => {
log.error('error piping data from source');
return _next(err);
}); });
return readable.pipe(response, { end: false });
}), err => {
if (err) {
log.debug('abort response due to client error', {
error: err.code, errMsg: err.message });
}
// call end for all cases (error/success) per node.js docs
// recommendation
response.end();
}
);
} }
function _responseBody(responseBackend, errCode, payload, response, log, function _responseBody(responseBackend, errCode, payload, response, log,