Compare commits

...

1 Commits

Author SHA1 Message Date
Mathieu Cassagne 1fe0e7fe88 simplify/rework retrieveData 2018-01-31 17:45:51 +01:00
2 changed files with 57 additions and 58 deletions

View File

@ -1,74 +1,63 @@
const { eachSeries } = require('async');
const { eachOf, eachSeries, each } = require('async');
const responseErr = new Error();
responseErr.code = 'ResponseError';
responseErr.message = 'response closed by client request before all data sent';
function destroyStreams(res, readables) {
each(readables, (r, n) => {
r.destroy();
return n();
});
res.destroy();
}
export default function retrieveData(locations, retrieveDataFn, response, log) {
let responseDestroyed = false;
const _destroyResponse = () => {
// destroys the socket if available
response.destroy();
responseDestroyed = true;
};
const readables = [];
let streamError = false;
response.once('close', () => {
log.debug('received close event before response end');
_destroyResponse();
streamError = true;
});
eachSeries(locations,
(current, next) => retrieveDataFn(current, log, (err, readable) => {
let cbCalled = false;
const _next = err => {
// Avoid multiple callbacks since it's possible that response's
// close event and the readable's end event are emitted at
// the same time.
if (!cbCalled) {
cbCalled = true;
next(err);
}
};
response.once('error', err => {
log.debug('received error event before response end', {
error: err,
});
streamError = true;
});
eachOf(locations,
(loc, key, next) => retrieveDataFn(loc, log, (err, readable) => {
if (err) {
log.error('failed to get object', {
error: err,
method: 'retrieveData',
});
_destroyResponse();
return _next(err);
return next(err);
}
if (responseDestroyed) {
log.debug('response destroyed before readable could stream');
readable.emit('close');
return _next(responseErr);
}
// client closed the connection abruptly
response.once('close', () => {
log.debug('received close event before readable end');
if (!responseDestroyed) {
_destroyResponse();
}
readable.emit('close');
return _next(responseErr);
readable.once('error', () => {
streamError = true;
});
// readable stream successfully consumed
readable.on('end', () => {
log.debug('readable stream end reached');
return _next();
});
// errors on server side with readable stream
readable.on('error', err => {
log.error('error piping data from source');
return _next(err);
});
return readable.pipe(response, { end: false });
readables[key] = readable;
return next();
}), err => {
if (err) {
log.debug('abort response due to client error', {
error: err.code, errMsg: err.message });
if (err || streamError) {
log.error('error from one location, aborting', {
error: err || 'error on readable stream',
});
destroyStreams(response, readables);
return response.end();
}
// call end for all cases (error/success) per node.js docs
// recommendation
response.end();
return eachSeries(readables, (readable, next) => {
readable.once('error', next);
readable.once('end', next);
readable.pipe(response, { end: false });
}, err => {
if (err || streamError) {
log.error('error from stream of one location, aborting', {
error: err || 'error on readable stream',
});
destroyStreams(response, readables);
}
return response.end();
});
}
);
}

View File

@ -14,6 +14,7 @@ const owner = 'accessKey1canonicalID';
const namespace = 'default';
const bucketName = 'bucketname';
const postBody = Buffer.from('I am a body', 'utf8');
const postBody2 = Buffer.from('I am also a body', 'utf8');
const errCode = null;
const overrideHeaders = {};
const resHeaders = {};
@ -26,6 +27,15 @@ const dataStoreEntry = {
},
};
const dataStoreEntry2 = {
value: postBody2,
keyContext: {
bucketName,
owner,
namespace,
},
};
describe('responseStreamData:', () => {
beforeEach(() => {
cleanup();
@ -50,7 +60,7 @@ describe('responseStreamData:', () => {
});
it('should stream full requested object data for two part object', done => {
ds.push(null, dataStoreEntry, dataStoreEntry);
ds.push(null, dataStoreEntry, dataStoreEntry2);
const dataLocations = [
{
key: 1,
@ -69,7 +79,7 @@ describe('responseStreamData:', () => {
});
response.on('end', () => {
const data = response._getData();
const doublePostBody = postBody.toString().concat(postBody);
const doublePostBody = postBody.toString().concat(postBody2);
assert.strictEqual(data, doublePostBody);
done();
});