Compare commits

...

1 Commits

Author SHA1 Message Date
Electra Chong 4179841021 fix: undefined stream.destroy call
To end streaming in case of error, we were calling an unofficial method of the stream API which was removed and does not exist in the version of node we use. The method is re-added officially in node v.8 but until we upgrade we need to destroy the streams manually, by pushing null for readables and calling stream.end() for writables.
2017-11-07 09:56:30 -08:00
3 changed files with 88 additions and 6 deletions

View File

@ -1,5 +1,20 @@
const stream = require('stream'); const stream = require('stream');
class SubStream extends stream.PassThrough {
constructor(options) {
super(options);
this.on('stopStreamingToAzure', function stopStreamingToAzure() {
this._abortStreaming();
});
}
_abortStreaming() {
this.push(null);
this.end();
}
}
/** /**
* Interface for streaming subparts. * Interface for streaming subparts.
* @class SubStreamInterface * @class SubStreamInterface
@ -14,7 +29,8 @@ class SubStreamInterface {
this._totalLengthCounter = 0; this._totalLengthCounter = 0;
this._lengthCounter = 0; this._lengthCounter = 0;
this._subPartIndex = 0; this._subPartIndex = 0;
this._currentStream = new stream.PassThrough(); this._currentStream = new SubStream();
this._streamingAborted = false;
} }
/** /**
@ -51,12 +67,11 @@ class SubStreamInterface {
* @return {undefined} * @return {undefined}
*/ */
stopStreaming(piper) { stopStreaming(piper) {
this._streamingAborted = true;
if (piper) { if (piper) {
piper.unpipe(); piper.unpipe();
piper.destroy();
} }
this._sourceStream.destroy(); this._currentStream.emit('stopStreamingToAzure');
this._currentStream.destroy();
} }
/** /**
@ -97,7 +112,7 @@ class SubStreamInterface {
this._totalLengthCounter += this._lengthCounter; this._totalLengthCounter += this._lengthCounter;
this._lengthCounter = 0; this._lengthCounter = 0;
this._subPartIndex++; this._subPartIndex++;
this._currentStream = new stream.PassThrough(); this._currentStream = new SubStream();
this.resumeStreaming(); this.resumeStreaming();
return { return {
nextStream: this._currentStream, nextStream: this._currentStream,
@ -111,6 +126,10 @@ class SubStreamInterface {
* @return {undefined} * @return {undefined}
*/ */
write(chunk) { write(chunk) {
if (this._streamingAborted) {
// don't write
return;
}
const ready = this._currentStream.write(chunk); const ready = this._currentStream.write(chunk);
if (!ready) { if (!ready) {

View File

@ -151,9 +151,9 @@ dataStoreName, log, cb) => {
'putting multiple parts'); 'putting multiple parts');
resultsCollector.on('error', (err, subPartIndex) => { resultsCollector.on('error', (err, subPartIndex) => {
streamInterface.stopStreaming(request);
log.error(`Error putting subpart to Azure: ${subPartIndex}`, log.error(`Error putting subpart to Azure: ${subPartIndex}`,
{ error: err.message, dataStoreName }); { error: err.message, dataStoreName });
streamInterface.stopStreaming(request);
if (err.code === 'ContainerNotFound') { if (err.code === 'ContainerNotFound') {
return cb(errors.NoSuchBucket); return cb(errors.NoSuchBucket);
} }

View File

@ -0,0 +1,63 @@
const assert = require('assert');
const stream = require('stream');
const SubStreamInterface =
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');
describe('s3middleware SubStreamInterface.stopStreaming()', () => {
const eventsEmitted = {
sourceStreamUnpiped: false,
currentStreamStopStreamingToAzure: false,
currentStreamEnded: false,
};
const expectedSequence = {
sourceStreamUnpiped: 0,
currentStreamStopStreamingToAzure: 1,
currentStreamEnded: 2,
};
const data = Buffer.alloc(100);
let dataMarker = 0;
let eventSequence = 0;
const mockRequest = new stream.Readable({
read: () => {
if (dataMarker >= data.length) {
return mockRequest.push(null);
}
mockRequest.push(data.slice(dataMarker, dataMarker + 1));
dataMarker += 1;
return undefined;
},
});
const sourceStream = new stream.PassThrough();
const subStreamInterface = new SubStreamInterface(sourceStream);
sourceStream.on('unpipe', () => {
eventsEmitted.sourceStreamUnpiped = eventSequence++;
});
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
});
subStreamInterface._currentStream.on('finish', () => {
eventsEmitted.currentStreamEnded = eventSequence++;
});
it('should stop streaming data and end current stream', done => {
sourceStream.on('data', chunk => {
const currentLength = subStreamInterface.getLengthCounter();
if (currentLength === 10) {
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], false);
});
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
return subStreamInterface.stopStreaming(mockRequest);
}
return subStreamInterface.write(chunk);
});
mockRequest.pipe(sourceStream);
setTimeout(() => {
Object.keys(eventsEmitted).forEach(key => {
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
});
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
return done();
}, 1000);
});
});