Compare commits
1 Commits
developmen
...
rel/devVM
Author | SHA1 | Date |
---|---|---|
Electra Chong | 4179841021 |
|
@ -1,5 +1,20 @@
|
||||||
const stream = require('stream');
|
const stream = require('stream');
|
||||||
|
|
||||||
|
class SubStream extends stream.PassThrough {
|
||||||
|
constructor(options) {
|
||||||
|
super(options);
|
||||||
|
|
||||||
|
this.on('stopStreamingToAzure', function stopStreamingToAzure() {
|
||||||
|
this._abortStreaming();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
_abortStreaming() {
|
||||||
|
this.push(null);
|
||||||
|
this.end();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for streaming subparts.
|
* Interface for streaming subparts.
|
||||||
* @class SubStreamInterface
|
* @class SubStreamInterface
|
||||||
|
@ -14,7 +29,8 @@ class SubStreamInterface {
|
||||||
this._totalLengthCounter = 0;
|
this._totalLengthCounter = 0;
|
||||||
this._lengthCounter = 0;
|
this._lengthCounter = 0;
|
||||||
this._subPartIndex = 0;
|
this._subPartIndex = 0;
|
||||||
this._currentStream = new stream.PassThrough();
|
this._currentStream = new SubStream();
|
||||||
|
this._streamingAborted = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -51,12 +67,11 @@ class SubStreamInterface {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
stopStreaming(piper) {
|
stopStreaming(piper) {
|
||||||
|
this._streamingAborted = true;
|
||||||
if (piper) {
|
if (piper) {
|
||||||
piper.unpipe();
|
piper.unpipe();
|
||||||
piper.destroy();
|
|
||||||
}
|
}
|
||||||
this._sourceStream.destroy();
|
this._currentStream.emit('stopStreamingToAzure');
|
||||||
this._currentStream.destroy();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -97,7 +112,7 @@ class SubStreamInterface {
|
||||||
this._totalLengthCounter += this._lengthCounter;
|
this._totalLengthCounter += this._lengthCounter;
|
||||||
this._lengthCounter = 0;
|
this._lengthCounter = 0;
|
||||||
this._subPartIndex++;
|
this._subPartIndex++;
|
||||||
this._currentStream = new stream.PassThrough();
|
this._currentStream = new SubStream();
|
||||||
this.resumeStreaming();
|
this.resumeStreaming();
|
||||||
return {
|
return {
|
||||||
nextStream: this._currentStream,
|
nextStream: this._currentStream,
|
||||||
|
@ -111,6 +126,10 @@ class SubStreamInterface {
|
||||||
* @return {undefined}
|
* @return {undefined}
|
||||||
*/
|
*/
|
||||||
write(chunk) {
|
write(chunk) {
|
||||||
|
if (this._streamingAborted) {
|
||||||
|
// don't write
|
||||||
|
return;
|
||||||
|
}
|
||||||
const ready = this._currentStream.write(chunk);
|
const ready = this._currentStream.write(chunk);
|
||||||
|
|
||||||
if (!ready) {
|
if (!ready) {
|
||||||
|
|
|
@ -151,9 +151,9 @@ dataStoreName, log, cb) => {
|
||||||
'putting multiple parts');
|
'putting multiple parts');
|
||||||
|
|
||||||
resultsCollector.on('error', (err, subPartIndex) => {
|
resultsCollector.on('error', (err, subPartIndex) => {
|
||||||
streamInterface.stopStreaming(request);
|
|
||||||
log.error(`Error putting subpart to Azure: ${subPartIndex}`,
|
log.error(`Error putting subpart to Azure: ${subPartIndex}`,
|
||||||
{ error: err.message, dataStoreName });
|
{ error: err.message, dataStoreName });
|
||||||
|
streamInterface.stopStreaming(request);
|
||||||
if (err.code === 'ContainerNotFound') {
|
if (err.code === 'ContainerNotFound') {
|
||||||
return cb(errors.NoSuchBucket);
|
return cb(errors.NoSuchBucket);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,63 @@
|
||||||
|
const assert = require('assert');
|
||||||
|
const stream = require('stream');
|
||||||
|
const SubStreamInterface =
|
||||||
|
require('../../../../lib/s3middleware/azureHelpers/SubStreamInterface');
|
||||||
|
|
||||||
|
describe('s3middleware SubStreamInterface.stopStreaming()', () => {
|
||||||
|
const eventsEmitted = {
|
||||||
|
sourceStreamUnpiped: false,
|
||||||
|
currentStreamStopStreamingToAzure: false,
|
||||||
|
currentStreamEnded: false,
|
||||||
|
};
|
||||||
|
const expectedSequence = {
|
||||||
|
sourceStreamUnpiped: 0,
|
||||||
|
currentStreamStopStreamingToAzure: 1,
|
||||||
|
currentStreamEnded: 2,
|
||||||
|
};
|
||||||
|
const data = Buffer.alloc(100);
|
||||||
|
let dataMarker = 0;
|
||||||
|
let eventSequence = 0;
|
||||||
|
const mockRequest = new stream.Readable({
|
||||||
|
read: () => {
|
||||||
|
if (dataMarker >= data.length) {
|
||||||
|
return mockRequest.push(null);
|
||||||
|
}
|
||||||
|
mockRequest.push(data.slice(dataMarker, dataMarker + 1));
|
||||||
|
dataMarker += 1;
|
||||||
|
return undefined;
|
||||||
|
},
|
||||||
|
});
|
||||||
|
const sourceStream = new stream.PassThrough();
|
||||||
|
const subStreamInterface = new SubStreamInterface(sourceStream);
|
||||||
|
sourceStream.on('unpipe', () => {
|
||||||
|
eventsEmitted.sourceStreamUnpiped = eventSequence++;
|
||||||
|
});
|
||||||
|
subStreamInterface._currentStream.on('stopStreamingToAzure', () => {
|
||||||
|
eventsEmitted.currentStreamStopStreamingToAzure = eventSequence++;
|
||||||
|
});
|
||||||
|
subStreamInterface._currentStream.on('finish', () => {
|
||||||
|
eventsEmitted.currentStreamEnded = eventSequence++;
|
||||||
|
});
|
||||||
|
it('should stop streaming data and end current stream', done => {
|
||||||
|
sourceStream.on('data', chunk => {
|
||||||
|
const currentLength = subStreamInterface.getLengthCounter();
|
||||||
|
if (currentLength === 10) {
|
||||||
|
Object.keys(eventsEmitted).forEach(key => {
|
||||||
|
assert.strictEqual(eventsEmitted[key], false);
|
||||||
|
});
|
||||||
|
assert.strictEqual(mockRequest._readableState.pipesCount, 1);
|
||||||
|
return subStreamInterface.stopStreaming(mockRequest);
|
||||||
|
}
|
||||||
|
return subStreamInterface.write(chunk);
|
||||||
|
});
|
||||||
|
mockRequest.pipe(sourceStream);
|
||||||
|
setTimeout(() => {
|
||||||
|
Object.keys(eventsEmitted).forEach(key => {
|
||||||
|
assert.strictEqual(eventsEmitted[key], expectedSequence[key]);
|
||||||
|
});
|
||||||
|
assert.strictEqual(subStreamInterface.getLengthCounter(), 10);
|
||||||
|
assert.strictEqual(mockRequest._readableState.pipesCount, 0);
|
||||||
|
return done();
|
||||||
|
}, 1000);
|
||||||
|
});
|
||||||
|
});
|
Loading…
Reference in New Issue