Compare commits
1 Commits
developmen
...
rel/devVM
Author | SHA1 | Date |
---|---|---|
Electra Chong | 4179841021 |
|
@ -1,5 +1,20 @@
|
|||
const stream = require('stream');
|
||||
|
||||
class SubStream extends stream.PassThrough {
|
||||
constructor(options) {
|
||||
super(options);
|
||||
|
||||
this.on('stopStreamingToAzure', function stopStreamingToAzure() {
|
||||
this._abortStreaming();
|
||||
});
|
||||
}
|
||||
|
||||
_abortStreaming() {
|
||||
this.push(null);
|
||||
this.end();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Interface for streaming subparts.
|
||||
* @class SubStreamInterface
|
||||
|
@ -14,7 +29,8 @@ class SubStreamInterface {
|
|||
this._totalLengthCounter = 0;
|
||||
this._lengthCounter = 0;
|
||||
this._subPartIndex = 0;
|
||||
this._currentStream = new stream.PassThrough();
|
||||
this._currentStream = new SubStream();
|
||||
this._streamingAborted = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -51,12 +67,11 @@ class SubStreamInterface {
|
|||
* @return {undefined}
|
||||
*/
|
||||
stopStreaming(piper) {
|
||||
this._streamingAborted = true;
|
||||
if (piper) {
|
||||
piper.unpipe();
|
||||
piper.destroy();
|
||||
}
|
||||
this._sourceStream.destroy();
|
||||
this._currentStream.destroy();
|
||||
this._currentStream.emit('stopStreamingToAzure');
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,7 +112,7 @@ class SubStreamInterface {
|
|||
this._totalLengthCounter += this._lengthCounter;
|
||||
this._lengthCounter = 0;
|
||||
this._subPartIndex++;
|
||||
this._currentStream = new stream.PassThrough();
|
||||
this._currentStream = new SubStream();
|
||||
this.resumeStreaming();
|
||||
return {
|
||||
nextStream: this._currentStream,
|
||||
|
@ -111,6 +126,10 @@ class SubStreamInterface {
|
|||
* @return {undefined}
|
||||
*/
|
||||
write(chunk) {
|
||||
if (this._streamingAborted) {
|
||||
// don't write
|
||||
return;
|
||||
}
|
||||
const ready = this._currentStream.write(chunk);
|
||||
|
||||
if (!ready) {
|
||||
|
|
|
@ -151,9 +151,9 @@ dataStoreName, log, cb) => {
|
|||
'putting multiple parts');
|
||||
|
||||
resultsCollector.on('error', (err, subPartIndex) => {
|
||||
streamInterface.stopStreaming(request);
|
||||
log.error(`Error putting subpart to Azure: ${subPartIndex}`,
|
||||
{ error: err.message, dataStoreName });
|
||||
streamInterface.stopStreaming(request);
|
||||
if (err.code === 'ContainerNotFound') {
|
||||
return cb(errors.NoSuchBucket);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
const assert = require('assert');
|
||||
const stream = require('stream');
|
||||
const SubStreamInterface =
|
||||
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');
|
||||
|
||||
describe('s3middleware SubStreamInterface.stopStreaming()', () => {
|
||||
const eventsEmitted = {
|
||||
sourceStreamUnpiped: false,
|
||||
currentStreamStopStreamingToAzure: false,
|
||||
currentStreamEnded: false,
|
||||
};
|
||||
const expectedSequence = {
|
||||
sourceStreamUnpiped: 0,
|
||||
currentStreamStopStreamingToAzure: 1,
|
||||
currentStreamEnded: 2,
|
||||
};
|
||||
const data = Buffer.alloc(100);
|
||||
let dataMarker = 0;
|
||||
let eventSequence = 0;
|
||||
const mockRequest = new stream.Readable({
|
||||
read: () => {
|
||||
if (dataMarker >= data.length) {
|
||||
return mockRequest.push(null);
|
||||
}
|
||||
mockRequest.push(data.slice(dataMarker, dataMarker + 1));
|
||||
dataMarker += 1;
|
||||
return undefined;
|
||||
},
|
||||
});
|
||||
const sourceStream = new stream.PassThrough();
|
||||
const subStreamInterface = new SubStreamInterface(sourceStream);
|
||||
sourceStream.on('unpipe', () => {
|
||||
eventsEmitted.sourceStreamUnpiped = eventSequence++;
|
||||
});
|
||||
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
|
||||
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
|
||||
});
|
||||
subStreamInterface._currentStream.on('finish', () => {
|
||||
eventsEmitted.currentStreamEnded = eventSequence++;
|
||||
});
|
||||
it('should stop streaming data and end current stream', done => {
|
||||
sourceStream.on('data', chunk => {
|
||||
const currentLength = subStreamInterface.getLengthCounter();
|
||||
if (currentLength === 10) {
|
||||
Object.keys(eventsEmitted).forEach(key => {
|
||||
assert.strictEqual(eventsEmitted[key], false);
|
||||
});
|
||||
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
|
||||
return subStreamInterface.stopStreaming(mockRequest);
|
||||
}
|
||||
return subStreamInterface.write(chunk);
|
||||
});
|
||||
mockRequest.pipe(sourceStream);
|
||||
setTimeout(() => {
|
||||
Object.keys(eventsEmitted).forEach(key => {
|
||||
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
|
||||
});
|
||||
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
|
||||
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
|
||||
return done();
|
||||
}, 1000);
|
||||
});
|
||||
});
|
Loading…
Reference in New Issue